1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.CancelledKeyException;
19 import java.nio.channels.ClosedSelectorException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28
29 import org.mortbay.component.AbstractLifeCycle;
30 import org.mortbay.io.Connection;
31 import org.mortbay.io.EndPoint;
32 import org.mortbay.log.Log;
33 import org.mortbay.thread.Timeout;
34
35
36
37
38
39
40
41
42
43
44 public abstract class SelectorManager extends AbstractLifeCycle
45 {
46
47 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
48 private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
49 private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
50 private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
51 private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
52
53 private boolean _delaySelectKeyUpdate=true;
54 private long _maxIdleTime;
55 private long _lowResourcesConnections;
56 private long _lowResourcesMaxIdleTime;
57 private transient SelectSet[] _selectSet;
58 private int _selectSets=1;
59 private volatile int _set;
60
61
62
63
64
65
66 public void setMaxIdleTime(long maxIdleTime)
67 {
68 _maxIdleTime=maxIdleTime;
69 }
70
71
72
73
74
75 public void setSelectSets(int selectSets)
76 {
77 long lrc = _lowResourcesConnections * _selectSets;
78 _selectSets=selectSets;
79 _lowResourcesConnections=lrc/_selectSets;
80 }
81
82
83
84
85
86 public long getMaxIdleTime()
87 {
88 return _maxIdleTime;
89 }
90
91
92
93
94
95 public int getSelectSets()
96 {
97 return _selectSets;
98 }
99
100
101
102
103
104 public boolean isDelaySelectKeyUpdate()
105 {
106 return _delaySelectKeyUpdate;
107 }
108
109
110
111
112
113
114
115 public void register(SocketChannel channel, Object att) throws IOException
116 {
117 int s=_set++;
118 s=s%_selectSets;
119 SelectSet[] sets=_selectSet;
120 if (sets!=null)
121 {
122 SelectSet set=sets[s];
123 set.addChange(channel,att);
124 set.wakeup();
125 }
126 }
127
128
129
130
131
132
133
134 public void register(ServerSocketChannel acceptChannel) throws IOException
135 {
136 int s=_set++;
137 s=s%_selectSets;
138 SelectSet set=_selectSet[s];
139 set.addChange(acceptChannel);
140 set.wakeup();
141 }
142
143
144
145
146
147 public long getLowResourcesConnections()
148 {
149 return _lowResourcesConnections*_selectSets;
150 }
151
152
153
154
155
156
157
158
159 public void setLowResourcesConnections(long lowResourcesConnections)
160 {
161 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
162 }
163
164
165
166
167
168 public long getLowResourcesMaxIdleTime()
169 {
170 return _lowResourcesMaxIdleTime;
171 }
172
173
174
175
176
177
178 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
179 {
180 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
181 }
182
183
184
185
186
187
188 public void doSelect(int acceptorID) throws IOException
189 {
190 SelectSet[] sets= _selectSet;
191 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
192 sets[acceptorID].doSelect();
193 }
194
195
196
197
198
199
200 public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
201 {
202 _delaySelectKeyUpdate=delaySelectKeyUpdate;
203 }
204
205
206
207
208
209
210
211 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
212
213
214 public abstract boolean dispatch(Runnable task) throws IOException;
215
216
217
218
219
220 protected void doStart() throws Exception
221 {
222 _selectSet = new SelectSet[_selectSets];
223 for (int i=0;i<_selectSet.length;i++)
224 _selectSet[i]= new SelectSet(i);
225
226 super.doStart();
227 }
228
229
230
231 protected void doStop() throws Exception
232 {
233 SelectSet[] sets= _selectSet;
234 _selectSet=null;
235 if (sets!=null)
236 for (int i=0;i<sets.length;i++)
237 {
238 SelectSet set = sets[i];
239 if (set!=null)
240 set.stop();
241 }
242 super.doStop();
243 }
244
245
246
247
248
249 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
250
251
252
253
254
255 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
256
257
258 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
259
260
261
262
263
264
265
266
267
268 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269
270
271 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272 {
273 Log.warn(ex);
274 }
275
276
277
278
279 public class SelectSet
280 {
281 private transient int _change;
282 private transient List[] _changes;
283 private transient Timeout _idleTimeout;
284 private transient int _nextSet;
285 private transient Timeout _retryTimeout;
286 private transient Selector _selector;
287 private transient int _setID;
288 private volatile boolean _selecting;
289 private transient int _jvmBug;
290 private int _selects;
291 private long _monitorStart;
292 private long _monitorNext;
293 private boolean _pausing;
294 private SelectionKey _busyKey;
295 private int _busyKeyCount;
296 private long _log;
297 private int _paused;
298 private int _jvmFix0;
299 private int _jvmFix1;
300 private int _jvmFix2;
301
302
303 SelectSet(int acceptorID) throws Exception
304 {
305 _setID=acceptorID;
306
307 _idleTimeout = new Timeout(this);
308 _idleTimeout.setDuration(getMaxIdleTime());
309 _retryTimeout = new Timeout(this);
310 _retryTimeout.setDuration(0L);
311
312
313 _selector = Selector.open();
314 _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
315 _change=0;
316 _monitorStart=System.currentTimeMillis();
317 _monitorNext=_monitorStart+__MONITOR_PERIOD;
318 _log=_monitorStart+60000;
319 }
320
321
322 public void addChange(Object point)
323 {
324 synchronized (_changes)
325 {
326 _changes[_change].add(point);
327 }
328 }
329
330
331 public void addChange(SelectableChannel channel, Object att)
332 {
333 if (att==null)
334 addChange(channel);
335 else if (att instanceof EndPoint)
336 addChange(att);
337 else
338 addChange(new ChangeSelectableChannel(channel,att));
339 }
340
341
342 public void cancelIdle(Timeout.Task task)
343 {
344 synchronized (this)
345 {
346 task.cancel();
347 }
348 }
349
350
351
352
353
354
355
356 public void doSelect() throws IOException
357 {
358 SelectionKey key=null;
359
360 try
361 {
362 List changes;
363 final Selector selector;
364 synchronized (_changes)
365 {
366 changes=_changes[_change];
367 _change=_change==0?1:0;
368 _selecting=true;
369 selector=_selector;
370 }
371
372
373 try
374 {
375 for (int i = 0; i < changes.size(); i++)
376 {
377 try
378 {
379 Object o = changes.get(i);
380
381 if (o instanceof EndPoint)
382 {
383
384 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
385 endpoint.doUpdateKey();
386 }
387 else if (o instanceof Runnable)
388 {
389 dispatch((Runnable)o);
390 }
391 else if (o instanceof ChangeSelectableChannel)
392 {
393
394 final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
395 final SelectableChannel channel=asc._channel;
396 final Object att = asc._attachment;
397
398 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
399 {
400 key = channel.register(selector,SelectionKey.OP_READ,att);
401 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
402 key.attach(endpoint);
403 endpoint.dispatch();
404 }
405 else if (channel.isOpen())
406 {
407 channel.register(selector,SelectionKey.OP_CONNECT,att);
408 }
409 }
410 else if (o instanceof SocketChannel)
411 {
412 final SocketChannel channel=(SocketChannel)o;
413
414 if (channel.isConnected())
415 {
416 key = channel.register(selector,SelectionKey.OP_READ,null);
417 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
418 key.attach(endpoint);
419 endpoint.dispatch();
420 }
421 else if (channel.isOpen())
422 {
423 channel.register(selector,SelectionKey.OP_CONNECT,null);
424 }
425 }
426 else if (o instanceof ServerSocketChannel)
427 {
428 ServerSocketChannel channel = (ServerSocketChannel)o;
429 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
430 }
431 else if (o instanceof ChangeTask)
432 {
433 ((ChangeTask)o).run();
434 }
435 else
436 throw new IllegalArgumentException(o.toString());
437 }
438 catch (Exception e)
439 {
440 if (isRunning())
441 Log.warn(e);
442 else
443 Log.debug(e);
444 }
445 catch (Error e)
446 {
447 if (isRunning())
448 Log.warn(e);
449 else
450 Log.debug(e);
451 }
452 }
453 }
454 finally
455 {
456 changes.clear();
457 }
458
459 long idle_next = 0;
460 long retry_next = 0;
461 long now=System.currentTimeMillis();
462 synchronized (this)
463 {
464 _idleTimeout.setNow(now);
465 _retryTimeout.setNow(now);
466 if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
467 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
468 else
469 _idleTimeout.setDuration(_maxIdleTime);
470 idle_next=_idleTimeout.getTimeToNext();
471 retry_next=_retryTimeout.getTimeToNext();
472 }
473
474
475 long wait = 1000L;
476 if (idle_next >= 0 && wait > idle_next)
477 wait = idle_next;
478 if (wait > 0 && retry_next >= 0 && wait > retry_next)
479 wait = retry_next;
480
481
482 if (wait > 2)
483 {
484
485 if (_pausing)
486 {
487 try
488 {
489 Thread.sleep(__BUSY_PAUSE);
490 }
491 catch(InterruptedException e)
492 {
493 Log.ignore(e);
494 }
495 }
496
497 long before=now;
498 int selected=selector.select(wait);
499 now = System.currentTimeMillis();
500 _idleTimeout.setNow(now);
501 _retryTimeout.setNow(now);
502 _selects++;
503
504
505
506
507 if (now>_monitorNext)
508 {
509 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
510 _pausing=_selects>__MAX_SELECTS;
511 if (_pausing)
512 _paused++;
513
514 _selects=0;
515 _jvmBug=0;
516 _monitorStart=now;
517 _monitorNext=now+__MONITOR_PERIOD;
518 }
519
520 if (now>_log)
521 {
522 if (_paused>0)
523 Log.info(this+" Busy selector - injecting delay "+_paused+" times");
524
525 if (_jvmFix2>0)
526 Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
527
528 if (_jvmFix1>0)
529 Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
530
531 else if(Log.isDebugEnabled() && _jvmFix0>0)
532 Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
533 _paused=0;
534 _jvmFix2=0;
535 _jvmFix1=0;
536 _jvmFix0=0;
537 _log=now+60000;
538 }
539
540
541 if (selected==0 && wait>10 && (now-before)<(wait/2))
542 {
543
544 _jvmBug++;
545 if (_jvmBug>(__JVMBUG_THRESHHOLD))
546 {
547 try
548 {
549 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
550 _jvmFix2++;
551
552 Thread.sleep(__BUSY_PAUSE);
553 }
554 catch(InterruptedException e)
555 {
556 Log.ignore(e);
557 }
558 }
559 else if (_jvmBug==__JVMBUG_THRESHHOLD)
560 {
561 synchronized (this)
562 {
563
564 _jvmFix1++;
565
566 final Selector new_selector = Selector.open();
567 Iterator iterator = _selector.keys().iterator();
568 while (iterator.hasNext())
569 {
570 SelectionKey k = (SelectionKey)iterator.next();
571 if (!k.isValid() || k.interestOps()==0)
572 continue;
573
574 final SelectableChannel channel = k.channel();
575 final Object attachment = k.attachment();
576
577 if (attachment==null)
578 addChange(channel);
579 else
580 addChange(channel,attachment);
581 }
582 Selector old_selector=_selector;
583 _selector=new_selector;
584 try
585 {
586 old_selector.close();
587 }
588 catch(Exception e)
589 {
590 Log.warn(e);
591 }
592 return;
593 }
594 }
595 else if (_jvmBug%32==31)
596 {
597
598 int cancelled=0;
599 Iterator iter = selector.keys().iterator();
600 while(iter.hasNext())
601 {
602 SelectionKey k = (SelectionKey) iter.next();
603 if (k.isValid()&&k.interestOps()==0)
604 {
605 k.cancel();
606 cancelled++;
607 }
608 }
609 if (cancelled>0)
610 _jvmFix0++;
611
612 return;
613 }
614 }
615 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
616 {
617
618 SelectionKey busy = (SelectionKey)selector.selectedKeys().iterator().next();
619 if (busy==_busyKey)
620 {
621 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
622 {
623 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
624 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
625 busy.cancel();
626 if (endpoint!=null)
627 {
628 dispatch(new Runnable()
629 {
630 public void run()
631 {
632 try
633 {
634 endpoint.close();
635 }
636 catch (IOException e)
637 {
638 Log.ignore(e);
639 }
640 }
641 });
642 }
643 }
644 }
645 else
646 _busyKeyCount=0;
647 _busyKey=busy;
648 }
649 }
650 else
651 {
652 selector.selectNow();
653 _selects++;
654 }
655
656
657 if (_selector==null || !selector.isOpen())
658 return;
659
660
661 Iterator iter = selector.selectedKeys().iterator();
662 while (iter.hasNext())
663 {
664 key = (SelectionKey) iter.next();
665
666 try
667 {
668 if (!key.isValid())
669 {
670 key.cancel();
671 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
672 if (endpoint != null)
673 endpoint.doUpdateKey();
674 continue;
675 }
676
677 Object att = key.attachment();
678
679 if (att instanceof SelectChannelEndPoint)
680 {
681 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
682 endpoint.dispatch();
683 }
684 else if (key.isAcceptable())
685 {
686 SocketChannel channel = acceptChannel(key);
687 if (channel==null)
688 continue;
689
690 channel.configureBlocking(false);
691
692
693 _nextSet=++_nextSet%_selectSet.length;
694
695
696 if (_nextSet==_setID)
697 {
698
699 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
700 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
701 cKey.attach(endpoint);
702 if (endpoint != null)
703 endpoint.dispatch();
704 }
705 else
706 {
707
708 _selectSet[_nextSet].addChange(channel);
709 _selectSet[_nextSet].wakeup();
710 }
711 }
712 else if (key.isConnectable())
713 {
714
715 SocketChannel channel = (SocketChannel)key.channel();
716 boolean connected=false;
717 try
718 {
719 connected=channel.finishConnect();
720 }
721 catch(Exception e)
722 {
723 connectionFailed(channel,e,att);
724 }
725 finally
726 {
727 if (connected)
728 {
729 key.interestOps(SelectionKey.OP_READ);
730 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
731 key.attach(endpoint);
732 endpoint.dispatch();
733 }
734 else
735 {
736 key.cancel();
737 }
738 }
739 }
740 else
741 {
742
743 SocketChannel channel = (SocketChannel)key.channel();
744 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
745 key.attach(endpoint);
746 if (key.isReadable())
747 endpoint.dispatch();
748 }
749 key = null;
750 }
751 catch (CancelledKeyException e)
752 {
753 Log.ignore(e);
754 }
755 catch (Exception e)
756 {
757 if (isRunning())
758 Log.warn(e);
759 else
760 Log.ignore(e);
761
762 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
763 {
764 key.interestOps(0);
765
766 key.cancel();
767 }
768 }
769 }
770
771
772 selector.selectedKeys().clear();
773
774
775 _idleTimeout.tick(now);
776 _retryTimeout.tick(now);
777
778 }
779 catch (ClosedSelectorException e)
780 {
781 Log.warn(e);
782 }
783 catch (CancelledKeyException e)
784 {
785 Log.ignore(e);
786 }
787 finally
788 {
789 _selecting=false;
790 }
791 }
792
793
794 public SelectorManager getManager()
795 {
796 return SelectorManager.this;
797 }
798
799
800 public long getNow()
801 {
802 return _idleTimeout.getNow();
803 }
804
805
806 public void scheduleIdle(Timeout.Task task)
807 {
808 synchronized (this)
809 {
810 if (_idleTimeout.getDuration() <= 0)
811 return;
812
813 task.schedule(_idleTimeout);
814 }
815 }
816
817
818 public void scheduleTimeout(Timeout.Task task, long timeout)
819 {
820 synchronized (this)
821 {
822 _retryTimeout.schedule(task, timeout);
823 }
824 }
825
826
827 public void wakeup()
828 {
829 Selector selector = _selector;
830 if (selector!=null)
831 selector.wakeup();
832 }
833
834
835 Selector getSelector()
836 {
837 return _selector;
838 }
839
840
841 void stop() throws Exception
842 {
843 boolean selecting=true;
844 while(selecting)
845 {
846 wakeup();
847 selecting=_selecting;
848 }
849
850 ArrayList keys=new ArrayList(_selector.keys());
851 Iterator iter =keys.iterator();
852
853 while (iter.hasNext())
854 {
855 SelectionKey key = (SelectionKey)iter.next();
856 if (key==null)
857 continue;
858 Object att=key.attachment();
859 if (att instanceof EndPoint)
860 {
861 EndPoint endpoint = (EndPoint)att;
862 try
863 {
864 endpoint.close();
865 }
866 catch(IOException e)
867 {
868 Log.ignore(e);
869 }
870 }
871 }
872
873 synchronized (this)
874 {
875 selecting=_selecting;
876 while(selecting)
877 {
878 wakeup();
879 selecting=_selecting;
880 }
881
882 _idleTimeout.cancelAll();
883 _retryTimeout.cancelAll();
884 try
885 {
886 if (_selector != null)
887 _selector.close();
888 }
889 catch (IOException e)
890 {
891 Log.ignore(e);
892 }
893 _selector=null;
894 }
895 }
896 }
897
898
899 private static class ChangeSelectableChannel
900 {
901 final SelectableChannel _channel;
902 final Object _attachment;
903
904 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
905 {
906 super();
907 _channel = channel;
908 _attachment = attachment;
909 }
910 }
911
912
913 private interface ChangeTask
914 {
915 public void run();
916 }
917 }