View Javadoc

1   //========================================================================
2   //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.IOException;
18  import java.nio.channels.CancelledKeyException;
19  import java.nio.channels.ClosedSelectorException;
20  import java.nio.channels.SelectableChannel;
21  import java.nio.channels.SelectionKey;
22  import java.nio.channels.Selector;
23  import java.nio.channels.ServerSocketChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  
29  import org.mortbay.component.AbstractLifeCycle;
30  import org.mortbay.io.Connection;
31  import org.mortbay.io.EndPoint;
32  import org.mortbay.log.Log;
33  import org.mortbay.thread.Timeout;
34  
35  
36  /* ------------------------------------------------------------ */
37  /**
38   * The Selector Manager manages and number of SelectSets to allow
39   * NIO scheduling to scale to large numbers of connections.
40   * 
41   * @author gregw
42   *
43   */
44  public abstract class SelectorManager extends AbstractLifeCycle
45  {
46      // TODO Tune these by approx system speed.
47      private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.mortbay.io.nio.JVMBUG_THRESHHOLD",512).intValue();
48      private static final int __MONITOR_PERIOD=Integer.getInteger("org.mortbay.io.nio.MONITOR_PERIOD",1000).intValue();
49      private static final int __MAX_SELECTS=Integer.getInteger("org.mortbay.io.nio.MAX_SELECTS",15000).intValue();
50      private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
51      private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
52      
53      private boolean _delaySelectKeyUpdate=true;
54      private long _maxIdleTime;
55      private long _lowResourcesConnections;
56      private long _lowResourcesMaxIdleTime;
57      private transient SelectSet[] _selectSet;
58      private int _selectSets=1;
59      private volatile int _set;
60      
61      /* ------------------------------------------------------------ */
62      /**
63       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
64       * @see {@link #setLowResourcesMaxIdleTime(long)}
65       */
66      public void setMaxIdleTime(long maxIdleTime)
67      {
68          _maxIdleTime=maxIdleTime;
69      }
70      
71      /* ------------------------------------------------------------ */
72      /**
73       * @param selectSets
74       */
75      public void setSelectSets(int selectSets)
76      {
77          long lrc = _lowResourcesConnections * _selectSets; 
78          _selectSets=selectSets;
79          _lowResourcesConnections=lrc/_selectSets;
80      }
81      
82      /* ------------------------------------------------------------ */
83      /**
84       * @return
85       */
86      public long getMaxIdleTime()
87      {
88          return _maxIdleTime;
89      }
90      
91      /* ------------------------------------------------------------ */
92      /**
93       * @return
94       */
95      public int getSelectSets()
96      {
97          return _selectSets;
98      }
99      
100     /* ------------------------------------------------------------ */
101     /**
102      * @return
103      */
104     public boolean isDelaySelectKeyUpdate()
105     {
106         return _delaySelectKeyUpdate;
107     }
108 
109     /* ------------------------------------------------------------ */
110     /** Register a channel
111      * @param channel
112      * @param att Attached Object
113      * @throws IOException
114      */
115     public void register(SocketChannel channel, Object att) throws IOException
116     {
117         int s=_set++; 
118         s=s%_selectSets;
119         SelectSet[] sets=_selectSet;
120         if (sets!=null)
121         {
122             SelectSet set=sets[s];
123             set.addChange(channel,att);
124             set.wakeup();
125         }
126     }
127     
128     /* ------------------------------------------------------------ */
129     /** Register a serverchannel
130      * @param acceptChannel
131      * @return
132      * @throws IOException
133      */
134     public void register(ServerSocketChannel acceptChannel) throws IOException
135     {
136         int s=_set++; 
137         s=s%_selectSets;
138         SelectSet set=_selectSet[s];
139         set.addChange(acceptChannel);
140         set.wakeup();
141     }
142 
143     /* ------------------------------------------------------------ */
144     /**
145      * @return the lowResourcesConnections
146      */
147     public long getLowResourcesConnections()
148     {
149         return _lowResourcesConnections*_selectSets;
150     }
151 
152     /* ------------------------------------------------------------ */
153     /**
154      * Set the number of connections, which if exceeded places this manager in low resources state.
155      * This is not an exact measure as the connection count is averaged over the select sets.
156      * @param lowResourcesConnections the number of connections
157      * @see {@link #setLowResourcesMaxIdleTime(long)}
158      */
159     public void setLowResourcesConnections(long lowResourcesConnections)
160     {
161         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
162     }
163 
164     /* ------------------------------------------------------------ */
165     /**
166      * @return the lowResourcesMaxIdleTime
167      */
168     public long getLowResourcesMaxIdleTime()
169     {
170         return _lowResourcesMaxIdleTime;
171     }
172 
173     /* ------------------------------------------------------------ */
174     /**
175      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
176      * @see {@link #setMaxIdleTime(long)}
177      */
178     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
179     {
180         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
181     }
182     
183     /* ------------------------------------------------------------ */
184     /**
185      * @param acceptorID
186      * @throws IOException
187      */
188     public void doSelect(int acceptorID) throws IOException
189     {
190         SelectSet[] sets= _selectSet;
191         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
192             sets[acceptorID].doSelect();
193     }
194 
195 
196     /* ------------------------------------------------------------ */
197     /**
198      * @param delaySelectKeyUpdate
199      */
200     public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
201     {
202         _delaySelectKeyUpdate=delaySelectKeyUpdate;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /**
207      * @param key
208      * @return
209      * @throws IOException 
210      */
211     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
212 
213     /* ------------------------------------------------------------------------------- */
214     public abstract boolean dispatch(Runnable task) throws IOException;
215 
216     /* ------------------------------------------------------------ */
217     /* (non-Javadoc)
218      * @see org.mortbay.component.AbstractLifeCycle#doStart()
219      */
220     protected void doStart() throws Exception
221     {
222         _selectSet = new SelectSet[_selectSets];
223         for (int i=0;i<_selectSet.length;i++)
224             _selectSet[i]= new SelectSet(i);
225 
226         super.doStart();
227     }
228 
229 
230     /* ------------------------------------------------------------------------------- */
231     protected void doStop() throws Exception
232     {
233         SelectSet[] sets= _selectSet;
234         _selectSet=null;
235         if (sets!=null)
236             for (int i=0;i<sets.length;i++)
237             {
238                 SelectSet set = sets[i];
239                 if (set!=null)
240                     set.stop();
241             }
242         super.doStop();
243     }
244 
245     /* ------------------------------------------------------------ */
246     /**
247      * @param endpoint
248      */
249     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * @param endpoint
254      */
255     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
256 
257     /* ------------------------------------------------------------------------------- */
258     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
259 
260     /* ------------------------------------------------------------ */
261     /**
262      * @param channel
263      * @param selectSet
264      * @param sKey
265      * @return
266      * @throws IOException
267      */
268     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
269 
270     /* ------------------------------------------------------------------------------- */
271     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
272     {
273         Log.warn(ex);
274     }
275     
276     /* ------------------------------------------------------------------------------- */
277     /* ------------------------------------------------------------------------------- */
278     /* ------------------------------------------------------------------------------- */
279     public class SelectSet 
280     {
281         private transient int _change;
282         private transient List[] _changes;
283         private transient Timeout _idleTimeout;
284         private transient int _nextSet;
285         private transient Timeout _retryTimeout;
286         private transient Selector _selector;
287         private transient int _setID;
288         private volatile boolean _selecting;
289         private transient int _jvmBug;
290         private int _selects;
291         private long _monitorStart;
292         private long _monitorNext;
293         private boolean _pausing;
294         private SelectionKey _busyKey;
295         private int _busyKeyCount;
296         private long _log;
297         private int _paused;
298         private int _jvmFix0;
299         private int _jvmFix1;
300         private int _jvmFix2;
301         
302         /* ------------------------------------------------------------ */
303         SelectSet(int acceptorID) throws Exception
304         {
305             _setID=acceptorID;
306 
307             _idleTimeout = new Timeout(this);
308             _idleTimeout.setDuration(getMaxIdleTime());
309             _retryTimeout = new Timeout(this);
310             _retryTimeout.setDuration(0L);
311 
312             // create a selector;
313             _selector = Selector.open();
314             _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
315             _change=0;
316             _monitorStart=System.currentTimeMillis();
317             _monitorNext=_monitorStart+__MONITOR_PERIOD;
318             _log=_monitorStart+60000;
319         }
320         
321         /* ------------------------------------------------------------ */
322         public void addChange(Object point)
323         {
324             synchronized (_changes)
325             {
326                 _changes[_change].add(point);
327             }
328         }
329         
330         /* ------------------------------------------------------------ */
331         public void addChange(SelectableChannel channel, Object att)
332         {   
333             if (att==null)
334                 addChange(channel);
335             else if (att instanceof EndPoint)
336                 addChange(att);
337             else
338                 addChange(new ChangeSelectableChannel(channel,att));
339         }
340         
341         /* ------------------------------------------------------------ */
342         public void cancelIdle(Timeout.Task task)
343         {
344             synchronized (this)
345             {
346                 task.cancel();
347             }
348         }
349 
350         /* ------------------------------------------------------------ */
351         /**
352          * Select and dispatch tasks found from changes and the selector.
353          * 
354          * @throws IOException
355          */
356         public void doSelect() throws IOException
357         {
358             SelectionKey key=null;
359             
360             try
361             {
362                 List changes;
363                 final Selector selector;
364                 synchronized (_changes)
365                 {
366                     changes=_changes[_change];
367                     _change=_change==0?1:0;
368                     _selecting=true;
369                     selector=_selector;
370                 }
371 
372                 // Make any key changes required
373                 try
374                 {
375                 	for (int i = 0; i < changes.size(); i++)
376                 	{
377                 		try
378                 		{
379                 			Object o = changes.get(i);
380 
381                 			if (o instanceof EndPoint)
382                 			{
383                 				// Update the operations for a key.
384                 				SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
385                 				endpoint.doUpdateKey();
386                 			}
387                 			else if (o instanceof Runnable)
388                 			{
389                 				dispatch((Runnable)o);
390                 			}
391                 			else if (o instanceof ChangeSelectableChannel)
392                 			{
393                 				// finish accepting/connecting this connection
394                 				final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
395                 				final SelectableChannel channel=asc._channel;
396                 				final Object att = asc._attachment;
397 
398                 				if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
399                 				{
400                 					key = channel.register(selector,SelectionKey.OP_READ,att);
401                 					SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
402                 					key.attach(endpoint);
403                 					endpoint.dispatch();
404                 				}
405                 				else if (channel.isOpen())
406                 				{
407                 					channel.register(selector,SelectionKey.OP_CONNECT,att);
408                 				}
409                 			}
410                 			else if (o instanceof SocketChannel)
411                 			{
412                 				final SocketChannel channel=(SocketChannel)o;
413 
414                 				if (channel.isConnected())
415                 				{
416                 					key = channel.register(selector,SelectionKey.OP_READ,null);
417                 					SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
418                 					key.attach(endpoint);
419                 					endpoint.dispatch();
420                 				}
421                 				else if (channel.isOpen())
422                 				{
423                 					channel.register(selector,SelectionKey.OP_CONNECT,null);
424                 				}
425                 			}
426                 			else if (o instanceof ServerSocketChannel)
427                 			{
428                 				ServerSocketChannel channel = (ServerSocketChannel)o;
429                 				channel.register(getSelector(),SelectionKey.OP_ACCEPT);
430                 			}
431                 			else if (o instanceof ChangeTask)
432                 			{
433                 				((ChangeTask)o).run();
434                 			}
435                 			else
436                 				throw new IllegalArgumentException(o.toString());
437                 		}
438                 		catch (Exception e)
439                 		{
440                 			if (isRunning())
441                 				Log.warn(e);
442                 			else
443                 				Log.debug(e);
444                 		}
445                 		catch (Error e)
446                 		{
447                 			if (isRunning())
448                 				Log.warn(e);
449                 			else
450                 				Log.debug(e);
451                 		}
452                 	}
453                 }
454                 finally
455                 {
456                 	changes.clear();
457                 }
458                 
459                 long idle_next = 0;
460                 long retry_next = 0;
461                 long now=System.currentTimeMillis();
462                 synchronized (this)
463                 {
464                     _idleTimeout.setNow(now);
465                     _retryTimeout.setNow(now);
466                     if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
467                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
468                     else 
469                         _idleTimeout.setDuration(_maxIdleTime);
470                     idle_next=_idleTimeout.getTimeToNext();
471                     retry_next=_retryTimeout.getTimeToNext();
472                 }
473 
474                 // workout how low to wait in select
475                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
476                 if (idle_next >= 0 && wait > idle_next)
477                     wait = idle_next;
478                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
479                     wait = retry_next;
480     
481                 // Do the select.
482                 if (wait > 2) // TODO tune or configure this
483                 {
484                     // If we are in pausing mode
485                     if (_pausing)
486                     {
487                         try
488                         {
489                             Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
490                         }
491                         catch(InterruptedException e)
492                         {
493                             Log.ignore(e);
494                         }
495                     }
496                         
497                     long before=now;
498                     int selected=selector.select(wait);
499                     now = System.currentTimeMillis();
500                     _idleTimeout.setNow(now);
501                     _retryTimeout.setNow(now);
502                     _selects++;
503 
504                     // Look for JVM bugs over a monitor period.
505                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
506                     // http://bugs.sun.com/view_bug.do?bug_id=6693490
507                     if (now>_monitorNext)
508                     {
509                         _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
510                         _pausing=_selects>__MAX_SELECTS;
511                         if (_pausing)
512                             _paused++;
513                             
514                         _selects=0;
515                         _jvmBug=0;
516                         _monitorStart=now;
517                         _monitorNext=now+__MONITOR_PERIOD;
518                     }
519                     
520                     if (now>_log)
521                     {
522                         if (_paused>0)  
523                             Log.info(this+" Busy selector - injecting delay "+_paused+" times");
524 
525                         if (_jvmFix2>0)
526                             Log.info(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
527 
528                         if (_jvmFix1>0)
529                             Log.info(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, canceled keys "+_jvmFix0+" times");
530 
531                         else if(Log.isDebugEnabled() && _jvmFix0>0)
532                             Log.info(this+" JVM BUG(s) - canceled keys "+_jvmFix0+" times");
533                         _paused=0;
534                         _jvmFix2=0;
535                         _jvmFix1=0;
536                         _jvmFix0=0;
537                         _log=now+60000;
538                     }
539                     
540                     // If we see signature of possible JVM bug, increment count.
541                     if (selected==0 && wait>10 && (now-before)<(wait/2))
542                     {
543                         // Increment bug count and try a work around
544                         _jvmBug++;
545                         if (_jvmBug>(__JVMBUG_THRESHHOLD))
546                         {
547                             try
548                             {
549                                 if (_jvmBug==__JVMBUG_THRESHHOLD+1)
550                                     _jvmFix2++;
551                                     
552                                 Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
553                             }
554                             catch(InterruptedException e)
555                             {
556                                 Log.ignore(e);
557                             }
558                         }
559                         else if (_jvmBug==__JVMBUG_THRESHHOLD)
560                         {
561                             synchronized (this)
562                             {
563                                 // BLOODY SUN BUG !!!  Try refreshing the entire selector.
564                                 _jvmFix1++;
565                                 
566                                 final Selector new_selector = Selector.open();
567                                 Iterator iterator = _selector.keys().iterator();
568                                 while (iterator.hasNext())
569                                 {
570                                     SelectionKey k = (SelectionKey)iterator.next();
571                                     if (!k.isValid() || k.interestOps()==0)
572                                         continue;
573                                     
574                                     final SelectableChannel channel = k.channel();
575                                     final Object attachment = k.attachment();
576                                     
577                                     if (attachment==null)
578                                         addChange(channel);
579                                     else
580                                         addChange(channel,attachment);
581                                 }
582                                 Selector old_selector=_selector;
583                                 _selector=new_selector;
584                                 try 
585                                 {
586                                     old_selector.close();
587                                 }
588                                 catch(Exception e)
589                                 {
590                                     Log.warn(e);
591                                 }
592                                 return;
593                             }
594                         }
595                         else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
596                         {
597                             // Cancel keys with 0 interested ops
598                             int cancelled=0;
599                             Iterator iter = selector.keys().iterator();
600                             while(iter.hasNext())
601                             {
602                                 SelectionKey k = (SelectionKey) iter.next();
603                                 if (k.isValid()&&k.interestOps()==0)
604                                 {
605                                     k.cancel();
606                                     cancelled++;
607                                 }
608                             }
609                             if (cancelled>0)
610                                 _jvmFix0++;
611                             
612                             return;
613                         }
614                     }
615                     else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
616                     {
617                         // Look for busy key
618                         SelectionKey busy = (SelectionKey)selector.selectedKeys().iterator().next();
619                         if (busy==_busyKey)
620                         {
621                             if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
622                             {
623                                 final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
624                                 Log.warn("Busy Key "+busy.channel()+" "+endpoint);
625                                 busy.cancel();
626                                 if (endpoint!=null)
627                                 {
628                                     dispatch(new Runnable()
629                                     {
630                                         public void run()
631                                         {
632                                             try
633                                             {
634                                                 endpoint.close();
635                                             }
636                                             catch (IOException e)
637                                             {
638                                                 Log.ignore(e);
639                                             }
640                                         }
641                                     });
642                                 }
643                             }
644                         }
645                         else
646                             _busyKeyCount=0;
647                         _busyKey=busy;
648                     }
649                 }
650                 else 
651                 {
652                     selector.selectNow();
653                     _selects++;
654                 }
655 
656                 // have we been destroyed while sleeping
657                 if (_selector==null || !selector.isOpen())
658                     return;
659 
660                 // Look for things to do
661                 Iterator iter = selector.selectedKeys().iterator();
662                 while (iter.hasNext())
663                 {
664                     key = (SelectionKey) iter.next();
665                                         
666                     try
667                     {
668                         if (!key.isValid())
669                         {
670                             key.cancel();
671                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
672                             if (endpoint != null)
673                                 endpoint.doUpdateKey();
674                             continue;
675                         }
676                         
677                         Object att = key.attachment();
678                         
679                         if (att instanceof SelectChannelEndPoint)
680                         {
681                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
682                             endpoint.dispatch();
683                         }
684                         else if (key.isAcceptable())
685                         {
686                             SocketChannel channel = acceptChannel(key);
687                             if (channel==null)
688                                 continue;
689 
690                             channel.configureBlocking(false);
691 
692                             // TODO make it reluctant to leave 0
693                             _nextSet=++_nextSet%_selectSet.length;
694 
695                             // Is this for this selectset
696                             if (_nextSet==_setID)
697                             {
698                                 // bind connections to this select set.
699                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
700                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
701                                 cKey.attach(endpoint);
702                                 if (endpoint != null)
703                                     endpoint.dispatch();
704                             }
705                             else
706                             {
707                                 // nope - give it to another.
708                                 _selectSet[_nextSet].addChange(channel);
709                                 _selectSet[_nextSet].wakeup();
710                             }
711                         }
712                         else if (key.isConnectable())
713                         {
714                             // Complete a connection of a registered channel
715                             SocketChannel channel = (SocketChannel)key.channel();
716                             boolean connected=false;
717                             try
718                             {
719                                 connected=channel.finishConnect();
720                             }
721                             catch(Exception e)
722                             {
723                                 connectionFailed(channel,e,att);
724                             }
725                             finally
726                             {
727                                 if (connected)
728                                 {
729                                     key.interestOps(SelectionKey.OP_READ);
730                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
731                                     key.attach(endpoint);
732                                     endpoint.dispatch();
733                                 }
734                                 else
735                                 {
736                                     key.cancel();
737                                 }
738                             }
739                         }
740                         else
741                         {
742                             // Wrap readable registered channel in an endpoint
743                             SocketChannel channel = (SocketChannel)key.channel();
744                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
745                             key.attach(endpoint);
746                             if (key.isReadable())
747                                 endpoint.dispatch();                           
748                         }
749                         key = null;
750                     }
751                     catch (CancelledKeyException e)
752                     {
753                         Log.ignore(e);
754                     }
755                     catch (Exception e)
756                     {
757                         if (isRunning())
758                             Log.warn(e);
759                         else
760                             Log.ignore(e);
761 
762                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
763                         {
764                             key.interestOps(0);
765 
766                             key.cancel();
767                         } 
768                     }
769                 }
770                 
771                 // Everything always handled
772                 selector.selectedKeys().clear();
773 
774                 // tick over the timers
775                 _idleTimeout.tick(now);
776                 _retryTimeout.tick(now);
777                 
778             }
779             catch (ClosedSelectorException e)
780             {
781                 Log.warn(e);
782             }
783             catch (CancelledKeyException e)
784             {
785                 Log.ignore(e);
786             }
787             finally
788             {
789                 _selecting=false;
790             }
791         }
792 
793         /* ------------------------------------------------------------ */
794         public SelectorManager getManager()
795         {
796             return SelectorManager.this;
797         }
798 
799         /* ------------------------------------------------------------ */
800         public long getNow()
801         {
802             return _idleTimeout.getNow();
803         }
804         
805         /* ------------------------------------------------------------ */
806         public void scheduleIdle(Timeout.Task task)
807         {
808             synchronized (this)
809             {
810                 if (_idleTimeout.getDuration() <= 0)
811                     return;
812                 
813                 task.schedule(_idleTimeout);
814             }
815         }
816 
817         /* ------------------------------------------------------------ */
818         public void scheduleTimeout(Timeout.Task task, long timeout)
819         {
820             synchronized (this)
821             {
822                 _retryTimeout.schedule(task, timeout);
823             }
824         }
825 
826         /* ------------------------------------------------------------ */
827         public void wakeup()
828         {
829             Selector selector = _selector;
830             if (selector!=null)
831                 selector.wakeup();
832         }
833 
834         /* ------------------------------------------------------------ */
835         Selector getSelector()
836         {
837             return _selector;
838         }
839         
840         /* ------------------------------------------------------------ */
841         void stop() throws Exception
842         {
843             boolean selecting=true;
844             while(selecting)
845             {
846                 wakeup();
847                 selecting=_selecting;
848             }
849             
850             ArrayList keys=new ArrayList(_selector.keys());
851             Iterator iter =keys.iterator();
852 
853             while (iter.hasNext())
854             {
855                 SelectionKey key = (SelectionKey)iter.next();
856                 if (key==null)
857                     continue;
858                 Object att=key.attachment();
859                 if (att instanceof EndPoint)
860                 {
861                     EndPoint endpoint = (EndPoint)att;
862                     try
863                     {
864                         endpoint.close();
865                     }
866                     catch(IOException e)
867                     {
868                         Log.ignore(e);
869                     }
870                 }
871             }
872             
873             synchronized (this)
874             {
875                 selecting=_selecting;
876                 while(selecting)
877                 {
878                     wakeup();
879                     selecting=_selecting;
880                 }
881                 
882                 _idleTimeout.cancelAll();
883                 _retryTimeout.cancelAll();
884                 try
885                 {
886                     if (_selector != null)
887                         _selector.close();
888                 }
889                 catch (IOException e)
890                 {
891                     Log.ignore(e);
892                 } 
893                 _selector=null;
894             }
895         }
896     }
897 
898     /* ------------------------------------------------------------ */
899     private static class ChangeSelectableChannel
900     {
901         final SelectableChannel _channel;
902         final Object _attachment;
903         
904         public ChangeSelectableChannel(SelectableChannel channel, Object attachment)
905         {
906             super();
907             _channel = channel;
908             _attachment = attachment;
909         }
910     }
911 
912     /* ------------------------------------------------------------ */
913     private interface ChangeTask
914     {
915         public void run();
916     }
917 }