1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.jetty.nio;
16
17 import java.io.IOException;
18 import java.net.InetSocketAddress;
19 import java.net.Socket;
20 import java.nio.channels.SelectionKey;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23
24 import org.mortbay.io.Connection;
25 import org.mortbay.io.nio.SelectChannelEndPoint;
26 import org.mortbay.io.nio.SelectorManager;
27 import org.mortbay.io.nio.SelectorManager.SelectSet;
28 import org.mortbay.jetty.HttpConnection;
29 import org.mortbay.jetty.Request;
30 import org.mortbay.jetty.RetryRequest;
31 import org.mortbay.log.Log;
32 import org.mortbay.thread.Timeout;
33 import org.mortbay.util.ajax.Continuation;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 public class SelectChannelConnector extends AbstractNIOConnector
65 {
66 protected transient ServerSocketChannel _acceptChannel;
67 private long _lowResourcesConnections;
68 private long _lowResourcesMaxIdleTime;
69
70 private SelectorManager _manager = new SelectorManager()
71 {
72 protected SocketChannel acceptChannel(SelectionKey key) throws IOException
73 {
74
75 SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
76 if (channel==null)
77 return null;
78 channel.configureBlocking(false);
79 Socket socket = channel.socket();
80 configure(socket);
81 return channel;
82 }
83
84 public boolean dispatch(Runnable task) throws IOException
85 {
86 return getThreadPool().dispatch(task);
87 }
88
89 protected void endPointClosed(SelectChannelEndPoint endpoint)
90 {
91
92 connectionClosed((HttpConnection)endpoint.getConnection());
93 }
94
95 protected void endPointOpened(SelectChannelEndPoint endpoint)
96 {
97
98 connectionOpened((HttpConnection)endpoint.getConnection());
99 }
100
101 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
102 {
103 return SelectChannelConnector.this.newConnection(channel,endpoint);
104 }
105
106 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
107 {
108 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
109 }
110 };
111
112
113
114
115
116
117 public SelectChannelConnector()
118 {
119 }
120
121
122 public void accept(int acceptorID) throws IOException
123 {
124 _manager.doSelect(acceptorID);
125 }
126
127
128 public void close() throws IOException
129 {
130 synchronized(this)
131 {
132 if(_manager.isRunning())
133 {
134 try
135 {
136 _manager.stop();
137 }
138 catch (Exception e)
139 {
140 Log.warn(e);
141 }
142 }
143 if (_acceptChannel != null)
144 _acceptChannel.close();
145 _acceptChannel = null;
146 }
147 }
148
149
150 public void customize(org.mortbay.io.EndPoint endpoint, Request request) throws IOException
151 {
152 ConnectorEndPoint cep = ((ConnectorEndPoint)endpoint);
153 cep.cancelIdle();
154 request.setTimeStamp(cep.getSelectSet().getNow());
155 super.customize(endpoint, request);
156 }
157
158
159 public void persist(org.mortbay.io.EndPoint endpoint) throws IOException
160 {
161 ((ConnectorEndPoint)endpoint).scheduleIdle();
162 super.persist(endpoint);
163 }
164
165
166 public Object getConnection()
167 {
168 return _acceptChannel;
169 }
170
171
172
173
174
175
176
177
178 public boolean getDelaySelectKeyUpdate()
179 {
180 return _manager.isDelaySelectKeyUpdate();
181 }
182
183
184 public int getLocalPort()
185 {
186 synchronized(this)
187 {
188 if (_acceptChannel==null || !_acceptChannel.isOpen())
189 return -1;
190 return _acceptChannel.socket().getLocalPort();
191 }
192 }
193
194
195
196
197
198 public Continuation newContinuation()
199 {
200 return new RetryContinuation();
201 }
202
203
204 public void open() throws IOException
205 {
206 synchronized(this)
207 {
208 if (_acceptChannel == null)
209 {
210
211 _acceptChannel = ServerSocketChannel.open();
212
213
214 _acceptChannel.socket().setReuseAddress(getReuseAddress());
215 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
216 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
217
218
219 _acceptChannel.configureBlocking(false);
220
221 }
222 }
223 }
224
225
226
227
228
229
230
231
232 public void setDelaySelectKeyUpdate(boolean delay)
233 {
234 _manager.setDelaySelectKeyUpdate(delay);
235 }
236
237
238 public void setMaxIdleTime(int maxIdleTime)
239 {
240 _manager.setMaxIdleTime(maxIdleTime);
241 super.setMaxIdleTime(maxIdleTime);
242 }
243
244
245
246
247
248
249 public long getLowResourcesConnections()
250 {
251 return _lowResourcesConnections;
252 }
253
254
255
256
257
258
259
260
261 public void setLowResourcesConnections(long lowResourcesConnections)
262 {
263 _lowResourcesConnections=lowResourcesConnections;
264 }
265
266
267
268
269
270 public long getLowResourcesMaxIdleTime()
271 {
272 return _lowResourcesMaxIdleTime;
273 }
274
275
276
277
278
279
280
281
282
283
284 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
285 {
286 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
287 super.setLowResourceMaxIdleTime((int)lowResourcesMaxIdleTime);
288 }
289
290
291
292
293
294
295
296
297
298 public void setLowResourceMaxIdleTime(int lowResourcesMaxIdleTime)
299 {
300 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
301 super.setLowResourceMaxIdleTime(lowResourcesMaxIdleTime);
302 }
303
304
305
306
307
308 protected void doStart() throws Exception
309 {
310 _manager.setSelectSets(getAcceptors());
311 _manager.setMaxIdleTime(getMaxIdleTime());
312 _manager.setLowResourcesConnections(getLowResourcesConnections());
313 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
314 _manager.start();
315 open();
316 _manager.register(_acceptChannel);
317 super.doStart();
318 }
319
320
321
322
323
324 protected void doStop() throws Exception
325 {
326 super.doStop();
327 }
328
329
330 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
331 {
332 return new ConnectorEndPoint(channel,selectSet,key);
333 }
334
335
336 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
337 {
338 return new HttpConnection(SelectChannelConnector.this,endpoint,getServer());
339 }
340
341
342
343
344 public static class ConnectorEndPoint extends SelectChannelEndPoint
345 {
346 public ConnectorEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
347 {
348 super(channel,selectSet,key);
349 scheduleIdle();
350 }
351
352 public void close() throws IOException
353 {
354 Connection con=getConnection();
355 if (con instanceof HttpConnection)
356 {
357 RetryContinuation continuation = (RetryContinuation) ((HttpConnection)getConnection()).getRequest().getContinuation();
358 if (continuation != null && continuation.isPending())
359 continuation.reset();
360 }
361
362 super.close();
363 }
364
365
366 public void undispatch()
367 {
368 Connection con=getConnection();
369 if (con instanceof HttpConnection)
370 {
371 RetryContinuation continuation = (RetryContinuation) ((HttpConnection)getConnection()).getRequest().getContinuation();
372
373 if (continuation != null)
374 {
375
376 Log.debug("continuation {}", continuation);
377 if (continuation.undispatch())
378 super.undispatch();
379 }
380 else
381 {
382 super.undispatch();
383 }
384 }
385 else
386 super.undispatch();
387 }
388 }
389
390
391
392
393 public static class RetryContinuation extends Timeout.Task implements Continuation, Runnable
394 {
395 SelectChannelEndPoint _endPoint=(SelectChannelEndPoint)HttpConnection.getCurrentConnection().getEndPoint();
396 boolean _new = true;
397 Object _object;
398 boolean _pending = false;
399 boolean _resumed = false;
400 boolean _parked =false;
401 RetryRequest _retry;
402 long _timeout;
403
404
405 public Object getObject()
406 {
407 return _object;
408 }
409
410 public long getTimeout()
411 {
412 return _timeout;
413 }
414
415 public boolean isNew()
416 {
417 return _new;
418 }
419
420 public boolean isPending()
421 {
422 return _pending;
423 }
424
425 public boolean isResumed()
426 {
427 return _resumed;
428 }
429
430 public void reset()
431 {
432 synchronized (this)
433 {
434 _resumed = false;
435 _pending = false;
436 _parked = false;
437 }
438
439 synchronized (_endPoint.getSelectSet())
440 {
441 this.cancel();
442 }
443 }
444
445 public boolean suspend(long timeout)
446 {
447 boolean resumed=false;
448 synchronized (this)
449 {
450 resumed=_resumed;
451 _resumed=false;
452 _new = false;
453 if (!_pending && !resumed && timeout >= 0)
454 {
455 _pending=true;
456 _parked = false;
457 _timeout = timeout;
458 if (_retry==null)
459 _retry = new RetryRequest();
460 throw _retry;
461 }
462
463
464
465 _resumed = false;
466 _pending = false;
467 _parked =false;
468 }
469
470 synchronized (_endPoint.getSelectSet())
471 {
472 this.cancel();
473 }
474
475 return resumed;
476 }
477
478 public void resume()
479 {
480 boolean redispatch=false;
481 synchronized (this)
482 {
483 if (_pending && !isExpired())
484 {
485 _resumed = true;
486 redispatch=_parked;
487 _parked=false;
488 }
489 }
490
491 if (redispatch)
492 {
493 SelectSet selectSet = _endPoint.getSelectSet();
494
495 synchronized (selectSet)
496 {
497 this.cancel();
498 }
499
500 _endPoint.scheduleIdle();
501 selectSet.addChange(this);
502 selectSet.wakeup();
503 }
504 }
505
506 public void expire()
507 {
508 boolean redispatch=false;
509 synchronized (this)
510 {
511 redispatch=_parked && _pending && !_resumed;
512 _parked=false;
513 }
514 if (redispatch)
515 {
516 _endPoint.scheduleIdle();
517 _endPoint.getSelectSet().addChange(this);
518 _endPoint.getSelectSet().wakeup();
519 }
520 }
521
522
523 public void run()
524 {
525 _endPoint.run();
526 }
527
528
529
530
531 public boolean undispatch()
532 {
533 boolean redispatch=false;
534
535 synchronized (this)
536 {
537 if (!_pending)
538 return true;
539
540 redispatch=isExpired() || _resumed;
541 _parked=!redispatch;
542 }
543
544 if (redispatch)
545 {
546 _endPoint.scheduleIdle();
547 _endPoint.getSelectSet().addChange(this);
548 }
549 else if (_timeout>0)
550 _endPoint.getSelectSet().scheduleTimeout(this,_timeout);
551
552 _endPoint.getSelectSet().wakeup();
553 return false;
554 }
555
556 public void setObject(Object object)
557 {
558 _object = object;
559 }
560
561 public String toString()
562 {
563 synchronized (this)
564 {
565 return "RetryContinuation@"+hashCode()+
566 (_new?",new":"")+
567 (_pending?",pending":"")+
568 (_resumed?",resumed":"")+
569 (isExpired()?",expired":"")+
570 (_parked?",parked":"");
571 }
572 }
573
574 }
575
576 }