View Javadoc

1   //========================================================================
2   //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.IOException;
18  import java.nio.channels.ClosedChannelException;
19  import java.nio.channels.SelectableChannel;
20  import java.nio.channels.SelectionKey;
21  import java.nio.channels.SocketChannel;
22  
23  import org.mortbay.io.Buffer;
24  import org.mortbay.io.Connection;
25  import org.mortbay.io.nio.SelectorManager.SelectSet;
26  import org.mortbay.jetty.EofException;
27  import org.mortbay.jetty.HttpException;
28  import org.mortbay.log.Log;
29  import org.mortbay.thread.Timeout;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   * 
35   * @author gregw
36   *
37   */
38  public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable
39  {
40      protected SelectorManager _manager;
41      protected SelectorManager.SelectSet _selectSet;
42      protected boolean _dispatched = false;
43      protected boolean _writable = true; 
44      protected SelectionKey _key;
45      protected int _interestOps;
46      protected boolean _readBlocked;
47      protected boolean _writeBlocked;
48      protected Connection _connection;
49  
50      private Timeout.Task _timeoutTask = new IdleTask();
51  
52      /* ------------------------------------------------------------ */
53      public Connection getConnection()
54      {
55          return _connection;
56      }
57      
58      /* ------------------------------------------------------------ */
59      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
60      {
61          super(channel);
62  
63          _manager = selectSet.getManager();
64          _selectSet = selectSet;
65          _connection = _manager.newConnection(channel,this);
66          
67          _manager.endPointOpened(this); // TODO not here!
68          
69          _key = key;
70      }
71  
72      /* ------------------------------------------------------------ */
73      void dispatch() throws IOException
74      {
75          boolean dispatch_done = true;
76          try
77          {
78              if (dispatch(_manager.isDelaySelectKeyUpdate()))
79              {
80                  dispatch_done= false;
81                  dispatch_done = _manager.dispatch((Runnable)this);
82              }
83          }
84          finally
85          {
86              if (!dispatch_done)
87              {
88                  Log.warn("dispatch failed!");
89                  undispatch();
90              }
91          }
92      }
93      
94      /* ------------------------------------------------------------ */
95      /**
96       * Put the endpoint into the dispatched state.
97       * A blocked thread may be woken up by this call, or the endpoint placed in a state ready
98       * for a dispatch to a threadpool.
99       * @param assumeShortDispatch If true, the interested ops are not modified.
100      * @return True if the endpoint should be dispatched to a thread pool.
101      * @throws IOException
102      */
103     public boolean dispatch(boolean assumeShortDispatch) throws IOException
104     {
105         // If threads are blocked on this
106         synchronized (this)
107         {
108             if (_key == null || !_key.isValid())
109             {
110                 _readBlocked=false;
111                 _writeBlocked=false;
112                 this.notifyAll();
113                 return false;
114             }
115             
116             if (_readBlocked || _writeBlocked)
117             {
118                 if (_readBlocked && _key.isReadable())
119                     _readBlocked=false;
120                 if (_writeBlocked && _key.isWritable())
121                     _writeBlocked=false;
122 
123                 // wake them up is as good as a dispatched.
124                 this.notifyAll();
125                 
126                 // we are not interested in further selecting
127                 _key.interestOps(0);
128                 return false;
129             }
130 
131             if (!assumeShortDispatch)
132                 _key.interestOps(0);
133 
134             // Otherwise if we are still dispatched
135             if (_dispatched)
136             {
137                 // we are not interested in further selecting
138                 _key.interestOps(0);
139                 return false;
140             }
141 
142             // Remove writeable op
143             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
144             {
145                 // Remove writeable op
146                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
147                 _key.interestOps(_interestOps);
148                 _writable = true; // Once writable is in ops, only removed with dispatch.
149             }
150 
151             _dispatched = true;
152         }
153         return true;
154     }
155 
156     /* ------------------------------------------------------------ */
157     public void scheduleIdle()
158     {
159         _selectSet.scheduleIdle(_timeoutTask);
160     }
161 
162     /* ------------------------------------------------------------ */
163     public void cancelIdle()
164     {
165         _selectSet.cancelIdle(_timeoutTask);
166     }
167 
168 
169     /* ------------------------------------------------------------ */
170     protected void idleExpired()
171     {
172         try
173         {
174             close();
175         }
176         catch (IOException e)
177         {
178             Log.ignore(e);
179         }
180     }
181     
182     /* ------------------------------------------------------------ */
183     /**
184      * Called when a dispatched thread is no longer handling the endpoint. The selection key
185      * operations are updated.
186      */
187     public void undispatch()
188     {
189         synchronized (this)
190         {
191             try
192             {
193                 _dispatched = false;
194                 updateKey();
195             }
196             catch (Exception e)
197             {
198                 // TODO investigate if this actually is a problem?
199                 Log.ignore(e);
200                 _interestOps = -1;
201                 _selectSet.addChange(this);
202             }
203         }
204     }
205 
206     /* ------------------------------------------------------------ */
207     /*
208      */
209     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
210     {
211         int l = super.flush(header, buffer, trailer);
212         _writable = l > 0;
213         return l;
214     }
215 
216     /* ------------------------------------------------------------ */
217     /*
218      */
219     public int flush(Buffer buffer) throws IOException
220     {
221         int l = super.flush(buffer);
222         _writable = l > 0;
223         return l;
224     }
225 
226     /* ------------------------------------------------------------ */
227     /*
228      * Allows thread to block waiting for further events.
229      */
230     public boolean blockReadable(long timeoutMs) throws IOException
231     {
232         synchronized (this)
233         {
234             long start=_selectSet.getNow();
235             try
236             {   
237                 _readBlocked=true;
238                 while (isOpen() && _readBlocked)
239                 {
240                     try
241                     {
242                         updateKey();
243                         this.wait(timeoutMs);
244 
245                         if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
246                             return false;
247                     }
248                     catch (InterruptedException e)
249                     {
250                         Log.warn(e);
251                     }
252                 }
253             }
254             finally
255             {
256                 _readBlocked=false;
257             }
258         }
259         return true;
260     }
261 
262     /* ------------------------------------------------------------ */
263     /*
264      * Allows thread to block waiting for further events.
265      */
266     public boolean blockWritable(long timeoutMs) throws IOException
267     {
268         synchronized (this)
269         {
270             long start=_selectSet.getNow();
271             try
272             {   
273                 _writeBlocked=true;
274                 while (isOpen() && _writeBlocked)
275                 {
276                     try
277                     {
278                         updateKey();
279                         this.wait(timeoutMs);
280 
281                         if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
282                             return false;
283                     }
284                     catch (InterruptedException e)
285                     {
286                         Log.warn(e);
287                     }
288                 }
289             }
290             finally
291             {
292                 _writeBlocked=false;
293                 scheduleIdle();
294             }
295         }
296         return true;
297     }
298 
299     /* ------------------------------------------------------------ */
300     public void setWritable(boolean writable)
301     {
302         _writable=writable;
303     }
304     
305     /* ------------------------------------------------------------ */
306     public void scheduleWrite()
307     {
308         _writable=false;
309         updateKey();
310     }
311     
312     /* ------------------------------------------------------------ */
313     /**
314      * Updates selection key. Adds operations types to the selection key as needed. No operations
315      * are removed as this is only done during dispatch. This method records the new key and
316      * schedules a call to doUpdateKey to do the keyChange
317      */
318     private void updateKey()
319     {
320         synchronized (this)
321         {
322             int ops=-1;
323             if (getChannel().isOpen())
324             {
325                 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
326                 _interestOps = 
327                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0) 
328                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
329             }
330             if(_interestOps == ops && getChannel().isOpen())
331                 return;
332             
333         }
334         _selectSet.addChange(this);
335         _selectSet.wakeup();
336     }
337     
338     /* ------------------------------------------------------------ */
339     /**
340      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
341      */
342     void doUpdateKey()
343     {
344         synchronized (this)
345         {
346             if (getChannel().isOpen())
347             {
348                 if (_interestOps>0)
349                 {
350                     if (_key==null || !_key.isValid())
351                     {
352                         SelectableChannel sc = (SelectableChannel)getChannel();
353                         if (sc.isRegistered())
354                         {
355                             updateKey();   
356                         }
357                         else
358                         {
359                             try
360                             {
361                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
362                             }
363                             catch (Exception e)
364                             {
365                                 Log.ignore(e);
366                                 if (_key!=null && _key.isValid())
367                                 {
368                                     _key.cancel();
369                                 }
370                                 cancelIdle();
371                                 _manager.endPointClosed(this);
372                                 _key = null;
373                             }
374                         }
375                     }
376                     else
377                     {
378                         _key.interestOps(_interestOps);
379                     }
380                 }
381                 else
382                 {
383                     if (_key!=null && _key.isValid())
384                         _key.interestOps(0);
385                     else
386                         _key=null;
387                 }
388             }
389             else    
390             {
391                 if (_key!=null && _key.isValid())
392                 {
393                     _key.interestOps(0);
394                     _key.cancel(); 
395                 }
396                 cancelIdle();
397                 _manager.endPointClosed(this);
398                 _key = null;
399             }
400         }
401     }
402 
403     /* ------------------------------------------------------------ */
404     /* 
405      */
406     public void run()
407     {
408         try
409         {
410             _connection.handle();
411         }
412         catch (ClosedChannelException e)
413         {
414             Log.ignore(e);
415         }
416         catch (EofException e)
417         {
418             Log.debug("EOF", e);
419             try{close();}
420             catch(IOException e2){Log.ignore(e2);}
421         }
422         catch (HttpException e)
423         {
424             Log.debug("BAD", e);
425             try{close();}
426             catch(IOException e2){Log.ignore(e2);}
427         }
428         catch (Throwable e)
429         {
430             Log.warn("handle failed", e);
431             try{close();}
432             catch(IOException e2){Log.ignore(e2);}
433         }
434         finally
435         {
436             undispatch();
437         }
438     }
439 
440     /* ------------------------------------------------------------ */
441     /*
442      * @see org.mortbay.io.nio.ChannelEndPoint#close()
443      */
444     public void close() throws IOException
445     {
446         try
447         {
448             super.close();
449         }
450         catch (IOException e)
451         {
452             Log.ignore(e);
453         }   
454         finally
455         {
456             updateKey();
457         }
458     }
459     
460     /* ------------------------------------------------------------ */
461     public String toString()
462     {
463         return "SCEP@" + hashCode() + "[d=" + _dispatched + ",io=" + _interestOps + ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
464     }
465 
466     /* ------------------------------------------------------------ */
467     public Timeout.Task getTimeoutTask()
468     {
469         return _timeoutTask;
470     }
471 
472     /* ------------------------------------------------------------ */
473     public SelectSet getSelectSet()
474     {
475         return _selectSet;
476     }
477 
478     /* ------------------------------------------------------------ */
479     /* ------------------------------------------------------------ */
480     /* ------------------------------------------------------------ */
481     public class IdleTask extends Timeout.Task 
482     {
483         /* ------------------------------------------------------------ */
484         /*
485          * @see org.mortbay.thread.Timeout.Task#expired()
486          */
487         public void expired()
488         {
489             idleExpired();
490         }
491 
492         public String toString()
493         {
494             return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
495         }
496 
497     }
498 
499 }