View Javadoc

1   /* ------------------------------------------------------------------------
2    * $Id$
3    * Copyright 2006 Tim Vernum
4    * ------------------------------------------------------------------------
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   * ------------------------------------------------------------------------
17   */
18  
19  package org.mortbay.servlet;
20  
21  import java.io.IOException;
22  import java.util.LinkedList;
23  import java.util.List;
24  
25  import javax.servlet.Filter;
26  import javax.servlet.FilterChain;
27  import javax.servlet.FilterConfig;
28  import javax.servlet.ServletException;
29  import javax.servlet.ServletRequest;
30  import javax.servlet.ServletResponse;
31  import javax.servlet.http.HttpServletRequest;
32  import javax.servlet.http.HttpServletResponse;
33  
34  import org.mortbay.log.Log;
35  import org.mortbay.util.ajax.Continuation;
36  
37  /**
38   * This filter protects a web application from having to handle an unmanageable load. 
39   * <p>
40   * For servers where there is 1 application with standardized resource restrictions, then this affect can be easily
41   *  controlled by limiting the size of the {@link org.mortbay.jetty.Server#setThreadPool server thread pool}, however
42   *  where there are multiple applications, or a single application has different resource requirements for different
43   *  URLs, then this filter can assist in managing the number of requests being services at any point in time.
44   * <p>
45   * The filter has 3 configurable values, which may be set as init parameters:
46   * <OL>
47   *  <LI><code>maximum</code> determines the maxmimum number of requests that may be on the filter chain at any point in time.
48   *      <i>(See below for a more detailed explanation)</i></LI>
49   * <LI><code>block</code> determines how long (in milliseconds) a request will be queued before it is rejected. 
50   *      Set this to -1 to block indefinately.</LI>
51   * <LI><code>queue</code> determines how many requests can be queued simultaneously - any additional requests will be rejected.
52   *      Set this to 0 to turn off queueing.</LI>
53   * </OL>
54   * 
55   * <b>Request Counting</b>: The filter counts how many requests are currently being services by the rest of the filter chain
56   *  (including any servlets that may be configured to handle the request). Request counting is <i>per instance</i> of the filter.
57   *  There is no syncronization between virtual machines, and the request count is not shared between multiple instances of the filter.
58   *  Thus a web.xml file such as <pre>
59   *  &lt;filter&gt;&lt;filter-name&gt;throttle1&lt;/filter-name&gt;
60   *          &lt;filter-class&gt;org.adjective.spiral.filter.ThrottlingFilter&lt;/filter-class&gt;
61   *  &lt;filter&gt;&lt;filter-name&gt;throttle2&lt;/filter-name&gt;
62   *          &lt;filter-class&gt;org.adjective.spiral.filter.ThrottlingFilter&lt;/filter-class&gt;</pre>
63   *  creates 2 separate filters with individual request counts.
64   * <p>
65   * <b>Queueing</b>: When the number of active requests exceed the <code>maximum</code> requests will be queued. This queue regulates
66   *  the flow of connections. Once the number of requests on the queue reached the <code>queue</code> threshold, then any new requests
67   *  will be rejected. Requests are queued for a maximum of <code>block</code> milliseconds - is no capacity is made available in this
68   *  time then the request will be rejected. The oldest pending request is removed from the queue and processed as soon as the number
69   *  of pending requests falls below the <code>maximum</code> value (<i>i.e.</i> when a request is completed)
70   * <p> 
71   * <b>Rejection</b>: Requests are rejected when the number of requests in progress has reached <i>maximum</i> and either the queue
72   * is full; or a request has been queued for more than <code>block</code> milliseconds. The rejection is performed by calling the
73   * method {@link #rejectRequest}. By default this method sends the HTTP status code {@link HttpServletResponse#SC_SERVICE_UNAVAILABLE 503},
74   * but this may be over-ridden in derived classes. 
75   * <p>
76   * This filter works best with the {@link org.mortbay.jetty.nio.SelectChannelConnector}, as {@link org.mortbay.jetty.RetryRequest} based 
77   * {@link org.mortbay.util.ajax.Continuation}s can be used to free the thread and other resources associated with the queued requests.
78   * 
79   * @author - Tim Vernum
80   */
81  public class ThrottlingFilter implements Filter
82  {
83  
84      private int _maximum;
85      private int _current;
86      private long _queueTimeout;
87      private long _queueSize;
88      private final Object _lock;
89      private final List _queue;
90  
91      public ThrottlingFilter()
92      {
93          _current = 0;
94          _lock = new Object();
95          _queue = new LinkedList();
96      }
97  
98      public void init(FilterConfig filterConfig) 
99          throws ServletException
100     {
101         _maximum = getIntegerParameter(filterConfig, "maximum", 10);
102         _queueTimeout = getIntegerParameter(filterConfig, "block", 5000);
103         _queueSize = getIntegerParameter(filterConfig, "queue", 500);
104 
105         if (_queueTimeout == -1)
106         {
107             _queueTimeout = Integer.MAX_VALUE;
108         }
109 
110         Log.debug("Config{maximum:" + _maximum + ", block:" + _queueTimeout + ", queue:" + _queueSize + "}", null, null);
111     }
112 
113     private int getIntegerParameter(FilterConfig filterConfig, String name, int defaultValue) 
114         throws ServletException
115     {
116         String value = filterConfig.getInitParameter(name);
117         if (value == null)
118         {
119             return defaultValue;
120         }
121         try
122         {
123             return Integer.parseInt(value);
124         }
125         catch (NumberFormatException e)
126         {
127             throw new ServletException("Parameter " + name + " must be a number (was " + value + " instead)");
128         }
129     }
130 
131     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
132         throws IOException, ServletException
133     {
134         doFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
135     }
136 
137     public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain) 
138         throws IOException, ServletException
139     {
140         Continuation continuation = getContinuation(request);
141 
142         boolean accepted = false;
143         try
144         {
145             // Is the request accepted?
146             accepted=acceptRequest();
147             if (!accepted)
148             {
149                 // Has the request been tried before?
150                 if (continuation.isPending())
151                 {
152                     Log.debug("Request {} / {} was already queued, rejecting", request.getRequestURI(), continuation);
153                     dropFromQueue(continuation);
154                     continuation.reset();
155                 }
156                 // No if we can queue the request
157                 else if (queueRequest(request, response, continuation))
158                     // Try to get it accepted again (after wait in queue).
159                     accepted=acceptRequest();
160             }
161             
162             // Handle if we are accepted, else reject
163             if (accepted)
164                 chain.doFilter(request, response);
165             else
166                 rejectRequest(request, response);
167         }
168         finally
169         {
170             if (accepted)
171             {
172                 releaseRequest();
173                 popQueue();
174             }
175         }
176 
177     }
178 
179     private void dropFromQueue(Continuation continuation)
180     {
181         _queue.remove(continuation);
182         continuation.reset();
183     }
184 
185     protected void rejectRequest(HttpServletRequest request, HttpServletResponse response) throws IOException
186     {
187         response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Too many active connections to resource "
188                 + request.getRequestURI());
189     }
190 
191     private void popQueue()
192     {
193         Continuation continuation;
194         synchronized (_queue)
195         {
196             if (_queue.isEmpty())
197             {
198                 return;
199             }
200             continuation = (Continuation) _queue.remove(0);
201         }
202         Log.debug("Resuming continuation {}", continuation, null);
203         continuation.resume();
204     }
205 
206     private void releaseRequest()
207     {
208         synchronized (_lock)
209         {
210             _current--;
211         }
212     }
213 
214     private boolean acceptRequest()
215     {
216         synchronized (_lock)
217         {
218             if (_current < _maximum)
219             {
220                 _current++;
221                 return true;
222             }
223         }
224         return false;
225     }
226 
227     private boolean queueRequest(HttpServletRequest request, HttpServletResponse response, Continuation continuation) throws IOException,
228             ServletException
229     {
230         synchronized (_queue)
231         {
232             if (_queue.size() >= _queueSize)
233             {
234                 Log.debug("Queue is full, rejecting request {}", request.getRequestURI(), null);
235                 return false;
236             }
237             
238             Log.debug("Queuing request {} / {}", request.getRequestURI(), continuation);
239             _queue.add(continuation);
240         }
241 
242         continuation.suspend(_queueTimeout);
243         Log.debug("Resuming blocking continuation for request {}", request.getRequestURI(), null);
244         return true;
245     }
246 
247     private Continuation getContinuation(ServletRequest request)
248     {
249         return (Continuation) request.getAttribute("org.mortbay.jetty.ajax.Continuation");
250     }
251 
252     public void destroy()
253     {
254         _queue.clear();
255     }
256 
257 }