View Javadoc

1   // ========================================================================
2   // Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  package org.mortbay.servlet;
15  
16  import java.io.IOException;
17  import java.util.Queue;
18  import java.util.concurrent.Semaphore;
19  import java.util.concurrent.TimeUnit;
20  
21  import javax.servlet.Filter;
22  import javax.servlet.FilterChain;
23  import javax.servlet.FilterConfig;
24  import javax.servlet.ServletContext;
25  import javax.servlet.ServletException;
26  import javax.servlet.ServletRequest;
27  import javax.servlet.ServletResponse;
28  import javax.servlet.http.HttpServletRequest;
29  import javax.servlet.http.HttpServletResponse;
30  import javax.servlet.http.HttpSession;
31  
32  import org.mortbay.util.ArrayQueue;
33  import org.mortbay.util.ajax.Continuation;
34  import org.mortbay.util.ajax.ContinuationSupport;
35  
36  /**
37   * Quality of Service Filter. 
38   * 
39   * <p>This filter limits the number of active requests
40   * to the number set by the "maxRequests" init parameter (default 10). If more
41   * requests are received, they are suspended and placed on priority queues.
42   * Priorities are determined by the {@link #getPriority(ServletRequest)} method
43   * and are a value between 0 and the value given by the "maxPriority" init
44   * parameter (default 10), with higher values having higher priority.
45   * <p>
46   * This filter is ideal to prevent wasting threads waiting for slow/limited 
47   * resources such as a JDBC connection pool.  It avoids the situation where all of a 
48   * containers thread pool may be consumed blocking on such a slow resource.
49   * By limiting the number of active threads, a smaller thread pool may be used as 
50   * the threads are not wasted waiting.  Thus more memory may be available for use by 
51   * the active threads.
52   * <p>
53   * Furthermore, this filter uses a priority when resuming waiting requests. So that if
54   * a container is under load, and there are many requests waiting for resources,
55   * the {@link #getPriority(ServletRequest)} method is used, so that more important 
56   * requests are serviced first.     For example, this filter could be deployed with a 
57   * maxRequest limit slightly smaller than the containers thread pool and a high priority 
58   * allocated to admin users.  Thus regardless of load, admin users would always be
59   * able to access the web application.
60   * <p>
61   * The maxRequest limit is policed by a {@link Semaphore} and the filter will
62   * wait a short while attempting to acquire the semaphore. This wait is
63   * controlled by the "waitMs" init parameter and allows the expense of a suspend
64   * to be avoided if the semaphore is shortly available. If the semaphore cannot be 
65   * obtained, the request will be suspended for the default suspend period of the 
66   * container or the valued set as the "suspendMs" init parameter.
67   * 
68   * @author gregw
69   * 
70   */
71  public class QoSFilter implements Filter
72  {
73      final static int __DEFAULT_MAX_PRIORITY=10;
74      final static int __DEFAULT_PASSES=10;
75      final static int __DEFAULT_WAIT_MS=50;
76      final static long __DEFAULT_TIMEOUT_MS = 30000L;
77      
78      final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
79      final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
80      final static String MAX_WAIT_INIT_PARAM="maxWaitMs";
81      final static String SUSPEND_INIT_PARAM="suspendMs";
82  
83      ServletContext _context;
84      long _waitMs;
85      long _suspendMs;
86      Semaphore _passes;
87      Queue<Continuation>[] _queue;
88      String _suspended = "QoSFilter@" + this.hashCode();
89      String _continuation = "org.mortbay.jetty.ajax.Continuation";
90  
91      public void init(FilterConfig filterConfig) 
92      {
93          _context=filterConfig.getServletContext();
94          
95          int max_priority=__DEFAULT_MAX_PRIORITY;
96          if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
97              max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
98          _queue=new Queue[max_priority+1];
99          for (int p=0;p<_queue.length;p++)
100             _queue[p]=new ArrayQueue<Continuation>();
101         
102         int passes=__DEFAULT_PASSES;
103         if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
104             passes=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
105         _passes=new Semaphore(passes,true);
106         
107         long wait = __DEFAULT_WAIT_MS;
108         if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
109             wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
110         _waitMs=wait;
111         
112         long suspend = __DEFAULT_TIMEOUT_MS;
113         if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
114             suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
115         _suspendMs=suspend;
116     }
117 
118     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
119     {
120         
121         boolean accepted = false;
122         try
123         {
124             Boolean suspended = (Boolean)request.getAttribute(_suspended);
125             if (suspended == null)
126             {
127                 accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
128 
129                 if (accepted)
130                 {
131                     request.setAttribute(_suspended,Boolean.FALSE);
132                 }
133                 else
134                 {
135                     Continuation continuation =  ContinuationSupport.getContinuation((HttpServletRequest)request,_queue);
136                     int priority = getPriority(request);
137                     suspended=Boolean.TRUE;
138                     request.setAttribute(_suspended,suspended);
139                     synchronized (_queue)
140                     {
141                         _queue[priority].add(continuation);
142                         continuation.suspend(_suspendMs);
143                         // may fall through here if waiting continuation
144                     }
145                 }
146             }
147              
148             if (suspended!=null && suspended.booleanValue())
149             {
150                 request.setAttribute(_suspended,Boolean.FALSE);
151                 Continuation continuation =  ContinuationSupport.getContinuation((HttpServletRequest)request,_queue);
152                 if (continuation.isResumed())
153                 {
154                     _passes.acquire();
155                     accepted = true;
156                 }
157                 else
158                 {
159                     // Timeout! try 1 more time.
160                     accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
161                 }
162             }
163             else if (!accepted)
164             {
165                 // pass through resume of previously accepted request
166                 _passes.acquire();
167                 accepted = true;
168             }
169 
170             if (accepted)
171             {
172                 chain.doFilter(request,response);
173             }
174             else
175                 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
176             
177         }
178         catch (InterruptedException e)
179         {
180             _context.log("QoS",e);
181             ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
182         }
183         finally
184         {
185             if (accepted)
186             {
187                 synchronized (_queue)
188                 {
189                     for (int p = _queue.length; p-- > 0;)
190                     {
191                         Continuation continuation = _queue[p].poll();
192                         if (continuation != null)
193                         {
194                             continuation.resume();
195                             break;
196                         }
197                     }
198                 }
199                 _passes.release();
200             }
201         }
202     }
203 
204     /**
205      * Get the request Priority.
206      * <p> The default implementation assigns the following priorities:<ul>
207      * <li> 2 - for a authenticated request
208      * <li> 1 - for a request with valid /non new session 
209      * <li> 0 - for all other requests.
210      * </ul>
211      * This method may be specialised to provide application specific priorities.
212      * 
213      * @param request
214      * @return a value between 0 and the maximim priority
215      */
216     protected int getPriority(ServletRequest request)
217     {
218         HttpServletRequest base_request = (HttpServletRequest)request;
219         if (base_request.getUserPrincipal() != null)
220             return 2;
221         else
222         {
223             HttpSession session = base_request.getSession(false);
224             if (session != null && !session.isNew())
225                 return 1;
226             else
227                 return 0;
228         }
229     }
230 
231     public void destroy()
232     {
233     }
234 
235 }