1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.CancelledKeyException;
19 import java.nio.channels.ClosedSelectorException;
20 import java.nio.channels.SelectableChannel;
21 import java.nio.channels.SelectionKey;
22 import java.nio.channels.Selector;
23 import java.nio.channels.ServerSocketChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28
29 import org.mortbay.component.AbstractLifeCycle;
30 import org.mortbay.io.Connection;
31 import org.mortbay.io.EndPoint;
32 import org.mortbay.log.Log;
33 import org.mortbay.thread.Timeout;
34
35
36
37
38
39
40
41
42
43
44 public abstract class SelectorManager extends AbstractLifeCycle
45 {
46
47 private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
48 private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
49 private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
50 private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
51 private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
52
53 private boolean _delaySelectKeyUpdate=true;
54 private long _maxIdleTime;
55 private long _lowResourcesConnections;
56 private long _lowResourcesMaxIdleTime;
57 private transient SelectSet[] _selectSet;
58 private int _selectSets=1;
59 private volatile int _set;
60
61
62
63
64
65
66 public void setMaxIdleTime(long maxIdleTime)
67 {
68 _maxIdleTime=maxIdleTime;
69 }
70
71
72
73
74
75 public void setSelectSets(int selectSets)
76 {
77 long lrc = _lowResourcesConnections * _selectSets;
78 _selectSets=selectSets;
79 _lowResourcesConnections=lrc/_selectSets;
80 }
81
82
83
84
85
86 public long getMaxIdleTime()
87 {
88 return _maxIdleTime;
89 }
90
91
92
93
94
95 public int getSelectSets()
96 {
97 return _selectSets;
98 }
99
100
101
102
103
104 public boolean isDelaySelectKeyUpdate()
105 {
106 return _delaySelectKeyUpdate;
107 }
108
109
110
111
112
113
114
115 public void register(SocketChannel channel, Object att) throws IOException
116 {
117 int s=_set++;
118 s=s%_selectSets;
119 SelectSet[] sets=_selectSet;
120 if (sets!=null)
121 {
122 SelectSet set=sets[s];
123 set.addChange(channel,att);
124 set.wakeup();
125 }
126 }
127
128
129
130
131
132
133
134 public void register(ServerSocketChannel acceptChannel) throws IOException
135 {
136 int s=_set++;
137 s=s%_selectSets;
138 SelectSet set=_selectSet[s];
139 set.addChange(acceptChannel);
140 set.wakeup();
141 }
142
143
144
145
146
147 public long getLowResourcesConnections()
148 {
149 return _lowResourcesConnections*_selectSets;
150 }
151
152
153
154
155
156
157
158
159 public void setLowResourcesConnections(long lowResourcesConnections)
160 {
161 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
162 }
163
164
165
166
167
168 public long getLowResourcesMaxIdleTime()
169 {
170 return _lowResourcesMaxIdleTime;
171 }
172
173
174
175
176
177
178 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
179 {
180 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
181 }
182
183
184
185
186
187
188 public void doSelect(int acceptorID) throws IOException
189 {
190 SelectSet[] sets= _selectSet;
191 if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
192 sets[acceptorID].doSelect();
193 }
194
195
196
197
198
199
200 public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
201 {
202 _delaySelectKeyUpdate=delaySelectKeyUpdate;
203 }
204
205
206
207
208
209
210
211 protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
212
213
214 public abstract boolean dispatch(Runnable task) throws IOException;
215
216
217
218
219
220 protected void doStart() throws Exception
221 {
222 _selectSet = new SelectSet[_selectSets];
223 for (int i=0;i<_selectSet.length;i++)
224 _selectSet[i]= new SelectSet(i);
225
226 super.doStart();
227 }
228
229
230
231 protected void doStop() throws Exception
232 {
233 SelectSet[] sets= _selectSet;
234 _selectSet=null;
235 if (sets!=null)
236 for (int i=0;i<sets.length;i++)
237 {
238 SelectSet set = sets[i];
239 if (set!=null)
240 set.stop();
241 }
242 super.doStop();
243 }
244
245
246
247
248
249 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
250
251
252
253
254
255 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
256
257
258 protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
259
260
261
262
263
264
265
266
267
268 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269
270
271 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272 {
273 Log.warn(ex);
274 }
275
276
277
278
279 public class SelectSet
280 {
281 private transient int _change;
282 private transient List[] _changes;
283 private transient Timeout _idleTimeout;
284 private transient int _nextSet;
285 private transient Timeout _retryTimeout;
286 private transient Selector _selector;
287 private transient int _setID;
288 private volatile boolean _selecting;
289 private transient int _jvmBug;
290 private int _selects;
291 private long _monitorStart;
292 private long _monitorNext;
293 private boolean _pausing;
294 private SelectionKey _busyKey;
295 private int _busyKeyCount;
296 private long _log;
297 private int _paused;
298 private int _jvmFix0;
299 private int _jvmFix1;
300 private int _jvmFix2;
301
302
303 SelectSet(int acceptorID) throws Exception
304 {
305 _setID=acceptorID;
306
307 _idleTimeout = new Timeout(this);
308 _idleTimeout.setDuration(getMaxIdleTime());
309 _retryTimeout = new Timeout(this);
310 _retryTimeout.setDuration(0L);
311
312
313 _selector = Selector.open();
314 _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
315 _change=0;
316 _monitorStart=System.currentTimeMillis();
317 _monitorNext=_monitorStart+__MONITOR_PERIOD;
318 _log=_monitorStart+60000;
319 }
320
321
322 public void addChange(Object point)
323 {
324 synchronized (_changes)
325 {
326 _changes[_change].add(point);
327 }
328 }
329
330
331 public void addChange(SelectableChannel channel, Object att)
332 {
333 if (att==null)
334 addChange(channel);
335 else if (att instanceof EndPoint)
336 addChange(att);
337 else
338 addChange(new ChangeSelectableChannel(channel,att));
339 }
340
341
342 public void cancelIdle(Timeout.Task task)
343 {
344 synchronized (this)
345 {
346 task.cancel();
347 }
348 }
349
350
351
352
353
354
355
356 public void doSelect() throws IOException
357 {
358 SelectionKey key=null;
359
360 try
361 {
362 List changes;
363 final Selector selector;
364 synchronized (_changes)
365 {
366 changes=_changes[_change];
367 _change=_change==0?1:0;
368 _selecting=true;
369 selector=_selector;
370 }
371
372
373 for (int i = 0; i < changes.size(); i++)
374 {
375 try
376 {
377 Object o = changes.get(i);
378
379 if (o instanceof EndPoint)
380 {
381
382 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
383 endpoint.doUpdateKey();
384 }
385 else if (o instanceof Runnable)
386 {
387 dispatch((Runnable)o);
388 }
389 else if (o instanceof ChangeSelectableChannel)
390 {
391
392 final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
393 final SelectableChannel channel=asc._channel;
394 final Object att = asc._attachment;
395
396 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
397 {
398 key = channel.register(selector,SelectionKey.OP_READ,att);
399 SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
400 key.attach(endpoint);
401 endpoint.dispatch();
402 }
403 else if (channel.isOpen())
404 {
405 channel.register(selector,SelectionKey.OP_CONNECT,att);
406 }
407 }
408 else if (o instanceof SocketChannel)
409 {
410 final SocketChannel channel=(SocketChannel)o;
411
412 if (channel.isConnected())
413 {
414 key = channel.register(selector,SelectionKey.OP_READ,null);
415 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
416 key.attach(endpoint);
417 endpoint.dispatch();
418 }
419 else if (channel.isOpen())
420 {
421 channel.register(selector,SelectionKey.OP_CONNECT,null);
422 }
423 }
424 else if (o instanceof ServerSocketChannel)
425 {
426 ServerSocketChannel channel = (ServerSocketChannel)o;
427 channel.register(getSelector(),SelectionKey.OP_ACCEPT);
428 }
429 else if (o instanceof ChangeTask)
430 {
431 ((ChangeTask)o).run();
432 }
433 else
434 throw new IllegalArgumentException(o.toString());
435 }
436 catch (Exception e)
437 {
438 if (isRunning())
439 Log.warn(e);
440 else
441 Log.debug(e);
442 }
443 }
444 changes.clear();
445
446 long idle_next = 0;
447 long retry_next = 0;
448 long now=System.currentTimeMillis();
449 synchronized (this)
450 {
451 _idleTimeout.setNow(now);
452 _retryTimeout.setNow(now);
453 if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
454 _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
455 else
456 _idleTimeout.setDuration(_maxIdleTime);
457 idle_next=_idleTimeout.getTimeToNext();
458 retry_next=_retryTimeout.getTimeToNext();
459 }
460
461
462 long wait = 1000L;
463 if (idle_next >= 0 && wait > idle_next)
464 wait = idle_next;
465 if (wait > 0 && retry_next >= 0 && wait > retry_next)
466 wait = retry_next;
467
468
469 if (wait > 2)
470 {
471
472 if (_pausing)
473 {
474 try
475 {
476 Thread.sleep(__BUSY_PAUSE);
477 }
478 catch(InterruptedException e)
479 {
480 Log.ignore(e);
481 }
482 }
483
484 long before=now;
485 int selected=selector.select(wait);
486 now = System.currentTimeMillis();
487 _idleTimeout.setNow(now);
488 _retryTimeout.setNow(now);
489 _selects++;
490
491
492
493
494 if (now>_monitorNext)
495 {
496 _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
497 _pausing=_selects>__MAX_SELECTS;
498 if (_pausing)
499 _paused++;
500
501 _selects=0;
502 _jvmBug=0;
503 _monitorStart=now;
504 _monitorNext=now+__MONITOR_PERIOD;
505 }
506
507 if (now>_log)
508 {
509 if (_paused>0)
510 Log.info(this+" Busy selector - injecting delay "+_paused+" times");
511
512 if (_jvmFix2>0)
513 Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
514
515 if (_jvmFix1>0)
516 Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
517
518 else if(Log.isDebugEnabled() && _jvmFix0>0)
519 Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
520 _paused=0;
521 _jvmFix2=0;
522 _jvmFix1=0;
523 _jvmFix0=0;
524 _log=now+60000;
525 }
526
527
528 if (selected==0 && wait>10 && (now-before)<(wait/2))
529 {
530
531 _jvmBug++;
532 if (_jvmBug>(__JVMBUG_THRESHHOLD))
533 {
534 try
535 {
536 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
537 _jvmFix2++;
538
539 Thread.sleep(__BUSY_PAUSE);
540 }
541 catch(InterruptedException e)
542 {
543 Log.ignore(e);
544 }
545 }
546 else if (_jvmBug==__JVMBUG_THRESHHOLD)
547 {
548 synchronized (this)
549 {
550
551 _jvmFix1++;
552
553 final Selector new_selector = Selector.open();
554 Iterator iterator = _selector.keys().iterator();
555 while (iterator.hasNext())
556 {
557 SelectionKey k = (SelectionKey)iterator.next();
558 if (!k.isValid() || k.interestOps()==0)
559 continue;
560
561 final SelectableChannel channel = k.channel();
562 final Object attachment = k.attachment();
563
564 if (attachment==null)
565 addChange(channel);
566 else
567 addChange(channel,attachment);
568 }
569 Selector old_selector=_selector;
570 _selector=new_selector;
571 try
572 {
573 old_selector.close();
574 }
575 catch(Exception e)
576 {
577 Log.warn(e);
578 }
579 return;
580 }
581 }
582 else if (_jvmBug%32==31)
583 {
584
585 int cancelled=0;
586 Iterator iter = selector.keys().iterator();
587 while(iter.hasNext())
588 {
589 SelectionKey k = (SelectionKey) iter.next();
590 if (k.isValid()&&k.interestOps()==0)
591 {
592 k.cancel();
593 cancelled++;
594 }
595 }
596 if (cancelled>0)
597 _jvmFix0++;
598
599 return;
600 }
601 }
602 else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
603 {
604
605 SelectionKey busy = (SelectionKey)selector.selectedKeys().iterator().next();
606 if (busy==_busyKey)
607 {
608 if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
609 {
610 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
611 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
612 busy.cancel();
613 if (endpoint!=null)
614 {
615 dispatch(new Runnable()
616 {
617 public void run()
618 {
619 try
620 {
621 endpoint.close();
622 }
623 catch (IOException e)
624 {
625 Log.ignore(e);
626 }
627 }
628 });
629 }
630 }
631 }
632 else
633 _busyKeyCount=0;
634 _busyKey=busy;
635 }
636 }
637 else
638 {
639 selector.selectNow();
640 _selects++;
641 }
642
643
644 if (_selector==null || !selector.isOpen())
645 return;
646
647
648 Iterator iter = selector.selectedKeys().iterator();
649 while (iter.hasNext())
650 {
651 key = (SelectionKey) iter.next();
652
653 try
654 {
655 if (!key.isValid())
656 {
657 key.cancel();
658 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
659 if (endpoint != null)
660 endpoint.doUpdateKey();
661 continue;
662 }
663
664 Object att = key.attachment();
665
666 if (att instanceof SelectChannelEndPoint)
667 {
668 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
669 endpoint.dispatch();
670 }
671 else if (key.isAcceptable())
672 {
673 SocketChannel channel = acceptChannel(key);
674 if (channel==null)
675 continue;
676
677 channel.configureBlocking(false);
678
679
680 _nextSet=++_nextSet%_selectSet.length;
681
682
683 if (_nextSet==_setID)
684 {
685
686 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
687 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
688 cKey.attach(endpoint);
689 if (endpoint != null)
690 endpoint.dispatch();
691 }
692 else
693 {
694
695 _selectSet[_nextSet].addChange(channel);
696 _selectSet[_nextSet].wakeup();
697 }
698 }
699 else if (key.isConnectable())
700 {
701
702 SocketChannel channel = (SocketChannel)key.channel();
703 boolean connected=false;
704 try
705 {
706 connected=channel.finishConnect();
707 }
708 catch(Exception e)
709 {
710 connectionFailed(channel,e,att);
711 }
712 finally
713 {
714 if (connected)
715 {
716 key.interestOps(SelectionKey.OP_READ);
717 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
718 key.attach(endpoint);
719 endpoint.dispatch();
720 }
721 else
722 {
723 key.cancel();
724 }
725 }
726 }
727 else
728 {
729
730 SocketChannel channel = (SocketChannel)key.channel();
731 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
732 key.attach(endpoint);
733 if (key.isReadable())
734 endpoint.dispatch();
735 }
736 key = null;
737 }
738 catch (CancelledKeyException e)
739 {
740 Log.ignore(e);
741 }
742 catch (Exception e)
743 {
744 if (isRunning())
745 Log.warn(e);
746 else
747 Log.ignore(e);
748
749 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
750 {
751 key.interestOps(0);
752
753 key.cancel();
754 }
755 }
756 }
757
758
759 selector.selectedKeys().clear();
760
761
762 _idleTimeout.tick(now);
763 _retryTimeout.tick(now);
764
765 }
766 catch (ClosedSelectorException e)
767 {
768 Log.warn(e);
769 }
770 catch (CancelledKeyException e)
771 {
772 Log.ignore(e);
773 }
774 finally
775 {
776 _selecting=false;
777 }
778 }
779
780
781 public SelectorManager getManager()
782 {
783 return SelectorManager.this;
784 }
785
786
787 public long getNow()
788 {
789 return _idleTimeout.getNow();
790 }
791
792
793 public void scheduleIdle(Timeout.Task task)
794 {
795 synchronized (this)
796 {
797 if (_idleTimeout.getDuration() <= 0)
798 return;
799
800 task.schedule(_idleTimeout);
801 }
802 }
803
804
805 public void scheduleTimeout(Timeout.Task task, long timeout)
806 {
807 synchronized (this)
808 {
809 _retryTimeout.schedule(task, timeout);
810 }
811 }
812
813
814 public void wakeup()
815 {
816 Selector selector = _selector;
817 if (selector!=null)
818 selector.wakeup();
819 }
820
821
822 Selector getSelector()
823 {
824 return _selector;
825 }
826
827
828 void stop() throws Exception
829 {
830 boolean selecting=true;
831 while(selecting)
832 {
833 wakeup();
834 selecting=_selecting;
835 }
836
837 ArrayList keys=new ArrayList(_selector.keys());
838 Iterator iter =keys.iterator();
839
840 while (iter.hasNext())
841 {
842 SelectionKey key = (SelectionKey)iter.next();
843 if (key==null)
844 continue;
845 Object att=key.attachment();
846 if (att instanceof EndPoint)
847 {
848 EndPoint endpoint = (EndPoint)att;
849 try
850 {
851 endpoint.close();
852 }
853 catch(IOException e)
854 {
855 Log.ignore(e);
856 }
857 }
858 }
859
860 synchronized (this)
861 {
862 selecting=_selecting;
863 while(selecting)
864 {
865 wakeup();
866 selecting=_selecting;
867 }
868
869 _idleTimeout.cancelAll();
870 _retryTimeout.cancelAll();
871 try
872 {
873 if (_selector != null)
874 _selector.close();
875 }
876 catch (IOException e)
877 {
878 Log.ignore(e);
879 }
880 _selector=null;
881 }
882 }
883 }
884
885
886 private static class ChangeSelectableChannel
887 {
888 final SelectableChannel _channel;
889 final Object _attachment;
890
891 public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
892 {
893 super();
894 _channel = channel;
895 _attachment = attachment;
896 }
897 }
898
899
900 private interface ChangeTask
901 {
902 public void run();
903 }
904 }