1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.servlet;
15
16 import java.io.IOException;
17 import java.util.Queue;
18 import java.util.concurrent.Semaphore;
19 import java.util.concurrent.TimeUnit;
20
21 import javax.servlet.Filter;
22 import javax.servlet.FilterChain;
23 import javax.servlet.FilterConfig;
24 import javax.servlet.ServletContext;
25 import javax.servlet.ServletException;
26 import javax.servlet.ServletRequest;
27 import javax.servlet.ServletResponse;
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.servlet.http.HttpSession;
31
32 import org.mortbay.util.ArrayQueue;
33 import org.mortbay.util.ajax.Continuation;
34 import org.mortbay.util.ajax.ContinuationSupport;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class QoSFilter implements Filter
72 {
73 final static int __DEFAULT_MAX_PRIORITY=10;
74 final static int __DEFAULT_PASSES=10;
75 final static int __DEFAULT_WAIT_MS=50;
76 final static long __DEFAULT_TIMEOUT_MS = 30000L;
77
78 final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
79 final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
80 final static String MAX_WAIT_INIT_PARAM="maxWaitMs";
81 final static String SUSPEND_INIT_PARAM="suspendMs";
82
83 ServletContext _context;
84 long _waitMs;
85 long _suspendMs;
86 Semaphore _passes;
87 Queue<Continuation>[] _queue;
88 String _suspended = "QoSFilter@" + this.hashCode();
89 String _continuation = "org.mortbay.jetty.ajax.Continuation";
90
91 public void init(FilterConfig filterConfig)
92 {
93 _context=filterConfig.getServletContext();
94
95 int max_priority=__DEFAULT_MAX_PRIORITY;
96 if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
97 max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
98 _queue=new Queue[max_priority+1];
99 for (int p=0;p<_queue.length;p++)
100 _queue[p]=new ArrayQueue<Continuation>();
101
102 int passes=__DEFAULT_PASSES;
103 if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
104 passes=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
105 _passes=new Semaphore(passes,true);
106
107 long wait = __DEFAULT_WAIT_MS;
108 if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
109 wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
110 _waitMs=wait;
111
112 long suspend = __DEFAULT_TIMEOUT_MS;
113 if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
114 suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
115 _suspendMs=suspend;
116 }
117
118 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
119 {
120
121 boolean accepted = false;
122 try
123 {
124 Boolean suspended = (Boolean)request.getAttribute(_suspended);
125 if (suspended == null)
126 {
127 accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
128
129 if (accepted)
130 {
131 request.setAttribute(_suspended,Boolean.FALSE);
132 }
133 else
134 {
135 Continuation continuation = ContinuationSupport.getContinuation((HttpServletRequest)request,_queue);
136 int priority = getPriority(request);
137 suspended=Boolean.TRUE;
138 request.setAttribute(_suspended,suspended);
139 synchronized (_queue)
140 {
141 _queue[priority].add(continuation);
142 continuation.suspend(_suspendMs);
143
144 }
145 }
146 }
147
148 if (suspended!=null && suspended.booleanValue())
149 {
150 request.setAttribute(_suspended,Boolean.FALSE);
151 Continuation continuation = ContinuationSupport.getContinuation((HttpServletRequest)request,_queue);
152 if (continuation.isResumed())
153 {
154 _passes.acquire();
155 accepted = true;
156 }
157 else
158 {
159
160 accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
161 }
162 }
163 else if (!accepted)
164 {
165
166 _passes.acquire();
167 accepted = true;
168 }
169
170 if (accepted)
171 {
172 chain.doFilter(request,response);
173 }
174 else
175 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
176
177 }
178 catch (InterruptedException e)
179 {
180 _context.log("QoS",e);
181 ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
182 }
183 finally
184 {
185 if (accepted)
186 {
187 synchronized (_queue)
188 {
189 for (int p = _queue.length; p-- > 0;)
190 {
191 Continuation continuation = _queue[p].poll();
192 if (continuation != null)
193 {
194 continuation.resume();
195 break;
196 }
197 }
198 }
199 _passes.release();
200 }
201 }
202 }
203
204
205
206
207
208
209
210
211
212
213
214
215
216 protected int getPriority(ServletRequest request)
217 {
218 HttpServletRequest base_request = (HttpServletRequest)request;
219 if (base_request.getUserPrincipal() != null)
220 return 2;
221 else
222 {
223 HttpSession session = base_request.getSession(false);
224 if (session != null && !session.isNew())
225 return 1;
226 else
227 return 0;
228 }
229 }
230
231 public void destroy()
232 {
233 }
234
235 }