1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.CancelledKeyException;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.Selector;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.List;
26
27 import org.mortbay.component.AbstractLifeCycle;
28 import org.mortbay.component.LifeCycle;
29 import org.mortbay.io.Connection;
30 import org.mortbay.io.EndPoint;
31 import org.mortbay.log.Log;
32 import org.mortbay.thread.Timeout;
33
34
35
36
37
38
39
40
41
42
43 public abstract class SelectorManager extends AbstractLifeCycle
44 {
45 private boolean _delaySelectKeyUpdate=true;
46 private long _maxIdleTime;
47 private long _lowResourcesConnections;
48 private long _lowResourcesMaxIdleTime;
49 private transient SelectSet[] _selectSet;
50 private int _selectSets=1;
51 private volatile int _set;
52
53
54
55
56
57
58
59 public void setMaxIdleTime(long maxIdleTime)
60 {
61 _maxIdleTime=maxIdleTime;
62 }
63
64
65
66
67
68 public void setSelectSets(int selectSets)
69 {
70 long lrc = _lowResourcesConnections * _selectSets;
71 _selectSets=selectSets;
72 _lowResourcesConnections=lrc/_selectSets;
73 }
74
75
76
77
78
79 public long getMaxIdleTime()
80 {
81 return _maxIdleTime;
82 }
83
84
85
86
87
88 public int getSelectSets()
89 {
90 return _selectSets;
91 }
92
93
94
95
96
97 public boolean isDelaySelectKeyUpdate()
98 {
99 return _delaySelectKeyUpdate;
100 }
101
102
103
104
105
106
107
108 public void register(SocketChannel channel, Object att) throws IOException
109 {
110 int s=_set++;
111 s=s%_selectSets;
112 SelectSet[] sets=_selectSet;
113 if (sets!=null)
114 {
115 SelectSet set=sets[s];
116 set.addChange(channel,att);
117 set.wakeup();
118 }
119 }
120
121
122
123
124
125
126
127 public void register(ServerSocketChannel acceptChannel) throws IOException
128 {
129 int s=_set++;
130 s=s%_selectSets;
131 SelectSet set=_selectSet[s];
132 set.addChange(acceptChannel);
133 set.wakeup();
134 }
135
136
137
138
139
140 public long getLowResourcesConnections()
141 {
142 return _lowResourcesConnections*_selectSets;
143 }
144
145
146
147
148
149
150
151
152 public void setLowResourcesConnections(long lowResourcesConnections)
153 {
154 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
155 }
156
157
158
159
160
161 public long getLowResourcesMaxIdleTime()
162 {
163 return _lowResourcesMaxIdleTime;
164 }
165
166
167
168
169
170
171 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
172 {
173 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
174 }
175
176
177
178
179
180
181 public void doSelect(int acceptorID) throws IOException
182 {
183 SelectSet[] sets= _selectSet;
184 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
185 sets[acceptorID].doSelect();
186 }
187
188
189
190
191
192
193 public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
194 {
195 _delaySelectKeyUpdate=delaySelectKeyUpdate;
196 }
197
198
199
200
201
202
203
204 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
205
206
207 public abstract boolean dispatch(Runnable task) throws IOException;
208
209
210
211
212
213 protected void doStart() throws Exception
214 {
215 _selectSet = new SelectSet[_selectSets];
216 for (int i=0;i<_selectSet.length;i++)
217 _selectSet[i]= new SelectSet(i);
218
219 super.doStart();
220 }
221
222
223
224 protected void doStop() throws Exception
225 {
226 SelectSet[] sets= _selectSet;
227 _selectSet=null;
228 if (sets!=null)
229 for (int i=0;i<sets.length;i++)
230 {
231 SelectSet set = sets[i];
232 if (set!=null)
233 set.stop();
234 }
235 super.doStop();
236 }
237
238
239
240
241
242 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
243
244
245
246
247
248 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
249
250
251 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
252
253
254
255
256
257
258
259
260
261 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
262
263
264 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
265 {
266 Log.warn(ex);
267 }
268
269
270
271
272 public class SelectSet
273 {
274 private transient int _change;
275 private transient List[] _changes;
276 private transient Timeout _idleTimeout;
277 private transient int _nextSet;
278 private transient Timeout _retryTimeout;
279 private transient Selector _selector;
280 private transient int _setID;
281 private transient boolean _selecting;
282 private transient int _jvmBug;
283
284
285 SelectSet(int acceptorID) throws Exception
286 {
287 _setID=acceptorID;
288
289 _idleTimeout = new Timeout(this);
290 _idleTimeout.setDuration(getMaxIdleTime());
291 _retryTimeout = new Timeout(this);
292 _retryTimeout.setDuration(0L);
293
294
295 _selector = Selector.open();
296 _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
297 _change=0;
298 }
299
300
301 public void addChange(Object point)
302 {
303 synchronized (_changes)
304 {
305 _changes[_change].add(point);
306 if (point instanceof SocketChannel)
307 _changes[_change].add(null);
308 }
309 }
310
311
312 public void addChange(SocketChannel channel, Object att)
313 {
314 synchronized (_changes)
315 {
316 _changes[_change].add(channel);
317 _changes[_change].add(att);
318 }
319 }
320
321
322 public void cancelIdle(Timeout.Task task)
323 {
324 synchronized (this)
325 {
326 task.cancel();
327 }
328 }
329
330
331
332
333
334
335
336 public void doSelect() throws IOException
337 {
338 SelectionKey key=null;
339
340 try
341 {
342 List changes;
343 synchronized (_changes)
344 {
345 changes=_changes[_change];
346 _change=_change==0?1:0;
347 _selecting=true;
348 }
349
350
351 for (int i = 0; i < changes.size(); i++)
352 {
353 try
354 {
355 Object o = changes.get(i);
356 if (o instanceof EndPoint)
357 {
358
359 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
360 endpoint.doUpdateKey();
361 }
362 else if (o instanceof Runnable)
363 {
364 dispatch((Runnable)o);
365 }
366 else if (o instanceof SocketChannel)
367 {
368
369 SocketChannel channel=(SocketChannel)o;
370 Object att = changes.get(++i);
371
372 if (channel.isConnected())
373 {
374 key = channel.register(_selector,SelectionKey.OP_READ,att);
375 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
376 key.attach(endpoint);
377 endpoint.dispatch();
378 }
379 else
380 {
381 channel.register(_selector,SelectionKey.OP_CONNECT,att);
382 }
383
384 }
385 else if (o instanceof ServerSocketChannel)
386 {
387 ServerSocketChannel channel = (ServerSocketChannel)o;
388 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
389 }
390 else
391 throw new IllegalArgumentException(o.toString());
392 }
393 catch (CancelledKeyException e)
394 {
395 if (isRunning())
396 Log.warn(e);
397 else
398 Log.debug(e);
399 }
400 }
401 changes.clear();
402
403 long idle_next = 0;
404 long retry_next = 0;
405 long now=System.currentTimeMillis();
406 synchronized (this)
407 {
408 _idleTimeout.setNow(now);
409 _retryTimeout.setNow(now);
410 if (_lowResourcesConnections>0 && _selector.keys().size()>_lowResourcesConnections)
411 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
412 else
413 _idleTimeout.setDuration(_maxIdleTime);
414 idle_next=_idleTimeout.getTimeToNext();
415 retry_next=_retryTimeout.getTimeToNext();
416 }
417
418
419 long wait = 1000L;
420 if (idle_next >= 0 && wait > idle_next)
421 wait = idle_next;
422 if (wait > 0 && retry_next >= 0 && wait > retry_next)
423 wait = retry_next;
424
425
426 if (wait > 10)
427 {
428 long before=now;
429 int selected=_selector.select(wait);
430 now = System.currentTimeMillis();
431 _idleTimeout.setNow(now);
432 _retryTimeout.setNow(now);
433
434
435 if (selected==0 && wait>0 && (now-before)<wait/2 && _selector.selectedKeys().size()==0)
436 {
437 if (_jvmBug++>5)
438 {
439
440
441 Iterator iter = _selector.keys().iterator();
442 while(iter.hasNext())
443 {
444 key = (SelectionKey) iter.next();
445 if (key.isValid()&&key.interestOps()==0)
446 {
447 key.cancel();
448 }
449 }
450 try
451 {
452 Thread.sleep(20);
453 }
454 catch (InterruptedException e)
455 {
456 Log.ignore(e);
457 }
458 }
459 }
460 else
461 _jvmBug=0;
462 }
463 else
464 {
465 _selector.selectNow();
466 _jvmBug=0;
467 }
468
469
470 if (_selector==null || !_selector.isOpen())
471 return;
472
473
474 Iterator iter = _selector.selectedKeys().iterator();
475 while (iter.hasNext())
476 {
477 key = (SelectionKey) iter.next();
478
479 try
480 {
481 if (!key.isValid())
482 {
483 key.cancel();
484 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
485 if (endpoint != null)
486 endpoint.doUpdateKey();
487 continue;
488 }
489
490 Object att = key.attachment();
491 if (att instanceof SelectChannelEndPoint)
492 {
493 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
494 endpoint.dispatch();
495 }
496 else if (key.isAcceptable())
497 {
498 SocketChannel channel = acceptChannel(key);
499 if (channel==null)
500 continue;
501
502 channel.configureBlocking(false);
503
504
505 _nextSet=++_nextSet%_selectSet.length;
506
507
508 if (_nextSet==_setID)
509 {
510
511 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
512 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
513 cKey.attach(endpoint);
514 if (endpoint != null)
515 endpoint.dispatch();
516 }
517 else
518 {
519
520 _selectSet[_nextSet].addChange(channel);
521 _selectSet[_nextSet].wakeup();
522 }
523 }
524 else if (key.isConnectable())
525 {
526
527 SocketChannel channel = (SocketChannel)key.channel();
528 boolean connected=false;
529 try
530 {
531 connected=channel.finishConnect();
532 }
533 catch(Exception e)
534 {
535 connectionFailed(channel,e,att);
536 }
537 finally
538 {
539 if (connected)
540 {
541 key.interestOps(SelectionKey.OP_READ);
542 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
543 key.attach(endpoint);
544 endpoint.dispatch();
545 }
546 else
547 {
548 key.cancel();
549 }
550 }
551 }
552 else
553 {
554
555 SocketChannel channel = (SocketChannel)key.channel();
556 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
557 key.attach(endpoint);
558 if (key.isReadable())
559 endpoint.dispatch();
560 }
561 key = null;
562 }
563 catch (CancelledKeyException e)
564 {
565 Log.ignore(e);
566 }
567 catch (Exception e)
568 {
569 if (isRunning())
570 Log.warn(e);
571 else
572 Log.ignore(e);
573
574 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
575 {
576 key.interestOps(0);
577
578 key.cancel();
579 }
580 }
581 }
582
583
584 _selector.selectedKeys().clear();
585
586
587 _idleTimeout.tick(now);
588 _retryTimeout.tick(now);
589
590 }
591 catch (CancelledKeyException e)
592 {
593 Log.ignore(e);
594 }
595 finally
596 {
597 synchronized(this)
598 {
599 _selecting=false;
600 }
601 }
602 }
603
604
605 public SelectorManager getManager()
606 {
607 return SelectorManager.this;
608 }
609
610
611 public long getNow()
612 {
613 return _idleTimeout.getNow();
614 }
615
616
617 public void scheduleIdle(Timeout.Task task)
618 {
619 synchronized (this)
620 {
621 if (_idleTimeout.getDuration() <= 0)
622 return;
623
624 task.schedule(_idleTimeout);
625 }
626 }
627
628
629 public void scheduleTimeout(Timeout.Task task, long timeout)
630 {
631 synchronized (this)
632 {
633 _retryTimeout.schedule(task, timeout);
634 }
635 }
636
637
638 public void wakeup()
639 {
640 Selector selector = _selector;
641 if (selector!=null)
642 selector.wakeup();
643 }
644
645
646 Selector getSelector()
647 {
648 return _selector;
649 }
650
651
652 void stop() throws Exception
653 {
654 boolean selecting=true;
655 while(selecting)
656 {
657 wakeup();
658 synchronized (this)
659 {
660 selecting=_selecting;
661 }
662 }
663
664 ArrayList keys=new ArrayList(_selector.keys());
665 Iterator iter =keys.iterator();
666
667 while (iter.hasNext())
668 {
669 SelectionKey key = (SelectionKey)iter.next();
670 if (key==null)
671 continue;
672 EndPoint endpoint = (EndPoint)key.attachment();
673 if (endpoint!=null)
674 {
675 try
676 {
677 endpoint.close();
678 }
679 catch(IOException e)
680 {
681 Log.ignore(e);
682 }
683 }
684 }
685
686 synchronized (this)
687 {
688 _idleTimeout.cancelAll();
689 _retryTimeout.cancelAll();
690 try
691 {
692 if (_selector != null)
693 _selector.close();
694 }
695 catch (IOException e)
696 {
697 Log.ignore(e);
698 }
699 _selector=null;
700 }
701 }
702 }
703
704 }