View Javadoc

1   // ========================================================================
2   // Copyright 2003-2005 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.jetty.nio;
16  
17  import java.io.IOException;
18  import java.net.InetSocketAddress;
19  import java.net.Socket;
20  import java.nio.channels.SelectionKey;
21  import java.nio.channels.ServerSocketChannel;
22  import java.nio.channels.SocketChannel;
23  
24  import org.mortbay.io.Connection;
25  import org.mortbay.io.nio.SelectChannelEndPoint;
26  import org.mortbay.io.nio.SelectorManager;
27  import org.mortbay.io.nio.SelectorManager.SelectSet;
28  import org.mortbay.jetty.HttpConnection;
29  import org.mortbay.jetty.Request;
30  import org.mortbay.jetty.RetryRequest;
31  import org.mortbay.log.Log;
32  import org.mortbay.thread.Timeout;
33  import org.mortbay.util.ajax.Continuation;
34  
35  /* ------------------------------------------------------------------------------- */
36  /**
37   * Selecting NIO connector.
38   * <p>
39   * This connector uses efficient NIO buffers with a non blocking threading model. Direct NIO buffers
40   * are used and threads are only allocated to connections with requests. Synchronization is used to
41   * simulate blocking for the servlet API, and any unflushed content at the end of request handling
42   * is written asynchronously.
43   * </p>
44   * <p>
45   * This connector is best used when there are a many connections that have idle periods.
46   * </p>
47   * <p>
48   * When used with {@link org.mortbay.util.ajax.Continuation}, threadless waits are supported. When
49   * a filter or servlet calls getEvent on a Continuation, a {@link org.mortbay.jetty.RetryRequest}
50   * runtime exception is thrown to allow the thread to exit the current request handling. Jetty will
51   * catch this exception and will not send a response to the client. Instead the thread is released
52   * and the Continuation is placed on the timer queue. If the Continuation timeout expires, or it's
53   * resume method is called, then the request is again allocated a thread and the request is retried.
54   * The limitation of this approach is that request content is not available on the retried request,
55   * thus if possible it should be read after the continuation or saved as a request attribute or as the
56   * associated object of the Continuation instance.
57   * </p>
58   * 
59   * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
60   * 
61   * @author gregw
62   *
63   */
64  public class SelectChannelConnector extends AbstractNIOConnector 
65  {
66      protected transient ServerSocketChannel _acceptChannel;
67      private long _lowResourcesConnections;
68      private long _lowResourcesMaxIdleTime;
69  
70      private SelectorManager _manager = new SelectorManager()
71      {
72          protected SocketChannel acceptChannel(SelectionKey key) throws IOException
73          {
74              // TODO handle max connections
75              SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
76              if (channel==null)
77                  return null;
78              channel.configureBlocking(false);
79              Socket socket = channel.socket();
80              configure(socket);
81              return channel;
82          }
83  
84          public boolean dispatch(Runnable task) throws IOException
85          {
86              return getThreadPool().dispatch(task);
87          }
88  
89          protected void endPointClosed(SelectChannelEndPoint endpoint)
90          {
91              // TODO handle max connections and low resources
92              connectionClosed((HttpConnection)endpoint.getConnection());
93          }
94  
95          protected void endPointOpened(SelectChannelEndPoint endpoint)
96          {
97              // TODO handle max connections and low resources
98              connectionOpened((HttpConnection)endpoint.getConnection());
99          }
100 
101         protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
102         {
103             return SelectChannelConnector.this.newConnection(channel,endpoint);
104         }
105 
106         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
107         {
108             return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
109         }
110     };
111     
112     /* ------------------------------------------------------------------------------- */
113     /**
114      * Constructor.
115      * 
116      */
117     public SelectChannelConnector()
118     {
119     }
120     
121     /* ------------------------------------------------------------ */
122     public void accept(int acceptorID) throws IOException
123     {
124         _manager.doSelect(acceptorID);
125     }
126     
127     /* ------------------------------------------------------------ */
128     public void close() throws IOException
129     {
130         synchronized(this)
131         {
132             if(_manager.isRunning())
133             {
134                 try
135                 {
136                     _manager.stop();
137                 }
138                 catch (Exception e)
139                 {
140                     Log.warn(e);
141                 }
142             }
143             if (_acceptChannel != null)
144                 _acceptChannel.close();
145             _acceptChannel = null;
146         }
147     }
148     
149     /* ------------------------------------------------------------------------------- */
150     public void customize(org.mortbay.io.EndPoint endpoint, Request request) throws IOException
151     {
152         ConnectorEndPoint cep = ((ConnectorEndPoint)endpoint);
153         cep.cancelIdle();
154         request.setTimeStamp(cep.getSelectSet().getNow());
155         super.customize(endpoint, request);
156     }
157     
158     /* ------------------------------------------------------------------------------- */
159     public void persist(org.mortbay.io.EndPoint endpoint) throws IOException
160     {
161         ((ConnectorEndPoint)endpoint).scheduleIdle();
162         super.persist(endpoint);
163     }
164 
165     /* ------------------------------------------------------------ */
166     public Object getConnection()
167     {
168         return _acceptChannel;
169     }
170     
171     /* ------------------------------------------------------------ */
172     /** Get delay select key update
173      * If true, the select set is not updated when a endpoint is dispatched for
174      * reading. The assumption is that the task will be short and thus will probably
175      * be complete before the select is tried again.
176      * @return Returns the assumeShortDispatch.
177      */
178     public boolean getDelaySelectKeyUpdate()
179     {
180         return _manager.isDelaySelectKeyUpdate();
181     }
182 
183     /* ------------------------------------------------------------------------------- */
184     public int getLocalPort()
185     {
186         synchronized(this)
187         {
188             if (_acceptChannel==null || !_acceptChannel.isOpen())
189                 return -1;
190             return _acceptChannel.socket().getLocalPort();
191         }
192     }
193 
194     /* ------------------------------------------------------------ */
195     /*
196      * @see org.mortbay.jetty.Connector#newContinuation()
197      */
198     public Continuation newContinuation()
199     {
200         return new RetryContinuation();
201     }
202 
203     /* ------------------------------------------------------------ */
204     public void open() throws IOException
205     {
206         synchronized(this)
207         {
208             if (_acceptChannel == null)
209             {
210                 // Create a new server socket
211                 _acceptChannel = ServerSocketChannel.open();
212 
213                 // Bind the server socket to the local host and port
214                 _acceptChannel.socket().setReuseAddress(getReuseAddress());
215                 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
216                 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
217 
218                 // Set to non blocking mode
219                 _acceptChannel.configureBlocking(false);
220                 
221             }
222         }
223     }
224 
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * @param delay If true, updating a {@link SelectionKey} is delayed until a redundant event is 
229      * schedules.  This is an optimization that assumes event handling can be completed before the next select
230      * completes.
231      */
232     public void setDelaySelectKeyUpdate(boolean delay)
233     {
234         _manager.setDelaySelectKeyUpdate(delay);
235     }
236 
237     /* ------------------------------------------------------------ */
238     public void setMaxIdleTime(int maxIdleTime)
239     {
240         _manager.setMaxIdleTime(maxIdleTime);
241         super.setMaxIdleTime(maxIdleTime);
242     }
243 
244 
245     /* ------------------------------------------------------------ */
246     /**
247      * @return the lowResourcesConnections
248      */
249     public long getLowResourcesConnections()
250     {
251         return _lowResourcesConnections;
252     }
253 
254     /* ------------------------------------------------------------ */
255     /**
256      * Set the number of connections, which if exceeded places this manager in low resources state.
257      * This is not an exact measure as the connection count is averaged over the select sets.
258      * @param lowResourcesConnections the number of connections
259      * @see {@link #setLowResourcesMaxIdleTime(long)}
260      */
261     public void setLowResourcesConnections(long lowResourcesConnections)
262     {
263         _lowResourcesConnections=lowResourcesConnections;
264     }
265 
266     /* ------------------------------------------------------------ */
267     /**
268      * @return the lowResourcesMaxIdleTime
269      */
270     public long getLowResourcesMaxIdleTime()
271     {
272         return _lowResourcesMaxIdleTime;
273     }
274 
275     /* ------------------------------------------------------------ */
276     /**
277      * Set the period in ms that a connection is allowed to be idle when this there are more
278      * than {@link #getLowResourcesConnections()} connections.  This allows the server to rapidly close idle connections
279      * in order to gracefully handle high load situations.
280      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low.
281      * @see {@link #setMaxIdleTime(long)}
282      * @deprecated use {@link #setLowResourceMaxIdleTime(int)}
283      */
284     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
285     {
286         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
287         super.setLowResourceMaxIdleTime((int)lowResourcesMaxIdleTime); // TODO fix the name duplications
288     }
289 
290     /* ------------------------------------------------------------ */
291     /**
292      * Set the period in ms that a connection is allowed to be idle when this there are more
293      * than {@link #getLowResourcesConnections()} connections.  This allows the server to rapidly close idle connections
294      * in order to gracefully handle high load situations.
295      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low.
296      * @see {@link #setMaxIdleTime(long)}
297      */
298     public void setLowResourceMaxIdleTime(int lowResourcesMaxIdleTime)
299     {
300         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
301         super.setLowResourceMaxIdleTime(lowResourcesMaxIdleTime); 
302     }
303     
304     /* ------------------------------------------------------------ */
305     /*
306      * @see org.mortbay.jetty.AbstractConnector#doStart()
307      */
308     protected void doStart() throws Exception
309     {
310         _manager.setSelectSets(getAcceptors());
311         _manager.setMaxIdleTime(getMaxIdleTime());
312         _manager.setLowResourcesConnections(getLowResourcesConnections());
313         _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
314         _manager.start();
315         open();
316         _manager.register(_acceptChannel);
317         super.doStart();
318     }
319 
320     /* ------------------------------------------------------------ */
321     /*
322      * @see org.mortbay.jetty.AbstractConnector#doStop()
323      */
324     protected void doStop() throws Exception
325     {        
326         super.doStop();
327     }
328 
329     /* ------------------------------------------------------------ */
330     protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
331     {
332         return new ConnectorEndPoint(channel,selectSet,key);
333     }
334 
335     /* ------------------------------------------------------------------------------- */
336     protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
337     {
338         return new HttpConnection(SelectChannelConnector.this,endpoint,getServer());
339     }
340 
341     /* ------------------------------------------------------------ */
342     /* ------------------------------------------------------------ */
343     /* ------------------------------------------------------------ */
344     public static class ConnectorEndPoint extends SelectChannelEndPoint
345     {
346         public ConnectorEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
347         {
348             super(channel,selectSet,key);
349             scheduleIdle();
350         }
351 
352         public void close() throws IOException
353         {
354             Connection con=getConnection();
355             if (con instanceof HttpConnection)
356             {
357                 RetryContinuation continuation = (RetryContinuation) ((HttpConnection)getConnection()).getRequest().getContinuation();
358                 if (continuation != null && continuation.isPending())
359                     continuation.reset();
360             }
361 
362             super.close();
363         }
364 
365         /* ------------------------------------------------------------ */
366         public void undispatch()
367         {
368             Connection con=getConnection();
369             if (con instanceof HttpConnection)
370             {
371                 RetryContinuation continuation = (RetryContinuation) ((HttpConnection)getConnection()).getRequest().getContinuation();
372 
373                 if (continuation != null)
374                 {
375                     // We have a continuation
376                     Log.debug("continuation {}", continuation);
377                     if (continuation.undispatch())
378                         super.undispatch();
379                 }
380                 else
381                 {
382                     super.undispatch();
383                 }
384             }
385             else
386                 super.undispatch();
387         }
388     }
389 
390     /* ------------------------------------------------------------ */
391     /* ------------------------------------------------------------ */
392     /* ------------------------------------------------------------ */
393     public static class RetryContinuation extends Timeout.Task implements Continuation, Runnable
394     {
395         SelectChannelEndPoint _endPoint=(SelectChannelEndPoint)HttpConnection.getCurrentConnection().getEndPoint();
396         boolean _new = true;
397         Object _object;
398         boolean _pending = false;   // waiting for resume or timeout
399         boolean _resumed = false;   // resume called.
400         boolean _parked =false;     // end point dispatched, but undispatch called.
401         RetryRequest _retry;
402         long _timeout;
403 
404         
405         public Object getObject()
406         {
407             return _object;
408         }
409 
410         public long getTimeout()
411         {
412             return _timeout;
413         }
414 
415         public boolean isNew()
416         {
417             return _new;
418         }
419 
420         public boolean isPending()
421         {
422             return _pending;
423         }
424 
425         public boolean isResumed()
426         {
427             return _resumed;
428         }
429 
430         public void reset()
431         {
432             synchronized (this)
433             {
434                 _resumed = false;
435                 _pending = false;
436                 _parked = false;
437             }
438             
439             synchronized (_endPoint.getSelectSet())
440             {
441                 this.cancel();   
442             }
443         } 
444 
445         public boolean suspend(long timeout)
446         {
447             boolean resumed=false;
448             synchronized (this)
449             {
450                 resumed=_resumed;
451                 _resumed=false;
452                 _new = false;
453                 if (!_pending && !resumed && timeout >= 0)
454                 {
455                     _pending=true;
456                     _parked = false;
457                     _timeout = timeout;
458                     if (_retry==null)
459                      _retry = new RetryRequest();
460                     throw _retry;
461                 }
462                 
463                 // here only if suspend called on pending continuation.
464                 // acts like a reset
465                 _resumed = false;
466                 _pending = false;
467                 _parked =false;
468             }
469 
470             synchronized (_endPoint.getSelectSet())
471             {
472                 this.cancel();   
473             }
474 
475             return resumed;
476         }
477         
478         public void resume()
479         {
480             boolean redispatch=false;
481             synchronized (this)
482             {
483                 if (_pending && !isExpired())
484                 {
485                     _resumed = true;
486                     redispatch=_parked;
487                     _parked=false;
488                 }
489             }
490 
491             if (redispatch)
492             {
493                 SelectSet selectSet = _endPoint.getSelectSet();
494                 
495                 synchronized (selectSet)
496                 {
497                     this.cancel();   
498                 }
499 
500                 _endPoint.scheduleIdle();  // TODO maybe not needed?
501                 selectSet.addChange(this);
502                 selectSet.wakeup();
503             }
504         }
505         
506         public void expire()
507         {
508             boolean redispatch=false;
509             synchronized (this)
510             {
511                 redispatch=_parked && _pending && !_resumed;
512                 _parked=false;
513             }
514             if (redispatch)
515             {
516                 _endPoint.scheduleIdle();  // TODO maybe not needed?
517                 _endPoint.getSelectSet().addChange(this);
518                 _endPoint.getSelectSet().wakeup();
519             }
520         }
521 
522         
523         public void run()
524         {
525             _endPoint.run();
526         }
527         
528         /* undispatch continuation.
529          * Called when an endppoint is undispatched.  
530          * Either sets timeout or dispatches if already resumed or expired */
531         public boolean undispatch()
532         {
533             boolean redispatch=false;
534         
535             synchronized (this)
536             {
537                 if (!_pending)
538                     return true;
539                 
540                 redispatch=isExpired() || _resumed;
541                 _parked=!redispatch;
542             }
543             
544             if (redispatch)
545             {
546                 _endPoint.scheduleIdle();
547                 _endPoint.getSelectSet().addChange(this);
548             }
549             else if (_timeout>0)
550                 _endPoint.getSelectSet().scheduleTimeout(this,_timeout);
551             
552             _endPoint.getSelectSet().wakeup();
553             return false;
554         }
555 
556         public void setObject(Object object)
557         {
558             _object = object;
559         }
560         
561         public String toString()
562         {
563             synchronized (this)
564             {
565                 return "RetryContinuation@"+hashCode()+
566                 (_new?",new":"")+
567                 (_pending?",pending":"")+
568                 (_resumed?",resumed":"")+
569                 (isExpired()?",expired":"")+
570                 (_parked?",parked":"");
571             }
572         }
573 
574     }
575 
576 }