1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mortbay.servlet;
20
21 import java.io.IOException;
22 import java.util.LinkedList;
23 import java.util.List;
24
25 import javax.servlet.Filter;
26 import javax.servlet.FilterChain;
27 import javax.servlet.FilterConfig;
28 import javax.servlet.ServletException;
29 import javax.servlet.ServletRequest;
30 import javax.servlet.ServletResponse;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.servlet.http.HttpServletResponse;
33
34 import org.mortbay.log.Log;
35 import org.mortbay.util.ajax.Continuation;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class ThrottlingFilter implements Filter
82 {
83
84 private int _maximum;
85 private int _current;
86 private long _queueTimeout;
87 private long _queueSize;
88 private final Object _lock;
89 private final List _queue;
90
91 public ThrottlingFilter()
92 {
93 _current = 0;
94 _lock = new Object();
95 _queue = new LinkedList();
96 }
97
98 public void init(FilterConfig filterConfig)
99 throws ServletException
100 {
101 _maximum = getIntegerParameter(filterConfig, "maximum", 10);
102 _queueTimeout = getIntegerParameter(filterConfig, "block", 5000);
103 _queueSize = getIntegerParameter(filterConfig, "queue", 500);
104
105 if (_queueTimeout == -1)
106 {
107 _queueTimeout = Integer.MAX_VALUE;
108 }
109
110 Log.debug("Config{maximum:" + _maximum + ", block:" + _queueTimeout + ", queue:" + _queueSize + "}", null, null);
111 }
112
113 private int getIntegerParameter(FilterConfig filterConfig, String name, int defaultValue)
114 throws ServletException
115 {
116 String value = filterConfig.getInitParameter(name);
117 if (value == null)
118 {
119 return defaultValue;
120 }
121 try
122 {
123 return Integer.parseInt(value);
124 }
125 catch (NumberFormatException e)
126 {
127 throw new ServletException("Parameter " + name + " must be a number (was " + value + " instead)");
128 }
129 }
130
131 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
132 throws IOException, ServletException
133 {
134 doFilter((HttpServletRequest) request, (HttpServletResponse) response, chain);
135 }
136
137 public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
138 throws IOException, ServletException
139 {
140 Continuation continuation = getContinuation(request);
141
142 boolean accepted = false;
143 try
144 {
145
146 accepted=acceptRequest();
147 if (!accepted)
148 {
149
150 if (continuation.isPending())
151 {
152 Log.debug("Request {} / {} was already queued, rejecting", request.getRequestURI(), continuation);
153 dropFromQueue(continuation);
154 continuation.reset();
155 }
156
157 else if (queueRequest(request, response, continuation))
158
159 accepted=acceptRequest();
160 }
161
162
163 if (accepted)
164 chain.doFilter(request, response);
165 else
166 rejectRequest(request, response);
167 }
168 finally
169 {
170 if (accepted)
171 {
172 releaseRequest();
173 popQueue();
174 }
175 }
176
177 }
178
179 private void dropFromQueue(Continuation continuation)
180 {
181 _queue.remove(continuation);
182 continuation.reset();
183 }
184
185 protected void rejectRequest(HttpServletRequest request, HttpServletResponse response) throws IOException
186 {
187 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Too many active connections to resource "
188 + request.getRequestURI());
189 }
190
191 private void popQueue()
192 {
193 Continuation continuation;
194 synchronized (_queue)
195 {
196 if (_queue.isEmpty())
197 {
198 return;
199 }
200 continuation = (Continuation) _queue.remove(0);
201 }
202 Log.debug("Resuming continuation {}", continuation, null);
203 continuation.resume();
204 }
205
206 private void releaseRequest()
207 {
208 synchronized (_lock)
209 {
210 _current--;
211 }
212 }
213
214 private boolean acceptRequest()
215 {
216 synchronized (_lock)
217 {
218 if (_current < _maximum)
219 {
220 _current++;
221 return true;
222 }
223 }
224 return false;
225 }
226
227 private boolean queueRequest(HttpServletRequest request, HttpServletResponse response, Continuation continuation) throws IOException,
228 ServletException
229 {
230 synchronized (_queue)
231 {
232 if (_queue.size() >= _queueSize)
233 {
234 Log.debug("Queue is full, rejecting request {}", request.getRequestURI(), null);
235 return false;
236 }
237
238 Log.debug("Queuing request {} / {}", request.getRequestURI(), continuation);
239 _queue.add(continuation);
240 }
241
242 continuation.suspend(_queueTimeout);
243 Log.debug("Resuming blocking continuation for request {}", request.getRequestURI(), null);
244 return true;
245 }
246
247 private Continuation getContinuation(ServletRequest request)
248 {
249 return (Continuation) request.getAttribute("org.mortbay.jetty.ajax.Continuation");
250 }
251
252 public void destroy()
253 {
254 _queue.clear();
255 }
256
257 }