1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.continuation;
16
17 import java.io.IOException;
18 import java.nio.ByteBuffer;
19 import java.util.ArrayList;
20 import java.util.List;
21 import javax.servlet.ServletException;
22 import javax.servlet.http.HttpServletRequest;
23 import javax.servlet.http.HttpServletResponse;
24
25 import org.cometd.Bayeux;
26 import org.cometd.Client;
27 import org.cometd.Message;
28 import org.mortbay.cometd.AbstractBayeux;
29 import org.mortbay.cometd.AbstractCometdServlet;
30 import org.mortbay.cometd.ClientImpl;
31 import org.mortbay.cometd.JSONTransport;
32 import org.mortbay.cometd.MessageImpl;
33 import org.mortbay.cometd.Transport;
34 import org.mortbay.util.ArrayQueue;
35 import org.mortbay.util.StringUtil;
36 import org.mortbay.util.ajax.Continuation;
37 import org.mortbay.util.ajax.ContinuationSupport;
38
39 public class ContinuationCometdServlet extends AbstractCometdServlet
40 {
41 public final static int __DEFAULT_REFS_THRESHOLD=0;
42 protected int _refsThreshold=__DEFAULT_REFS_THRESHOLD;
43 String _responseBuffer;
44
45 @Override
46 public void init() throws ServletException
47 {
48 String refsThreshold=getInitParameter("refsThreshold");
49 if (refsThreshold != null)
50 _refsThreshold=Integer.parseInt(refsThreshold);
51
52 if (_refsThreshold>0)
53 {
54 String server = getServletContext().getServerInfo();
55 if (server.startsWith("jetty/6"))
56 _responseBuffer="org.mortbay.jetty.ResponseBuffer";
57 else if (server.startsWith("jetty/"))
58 _responseBuffer="org.eclipse.jetty.server.ResponseBuffer";
59 else
60 _refsThreshold=0;
61 }
62
63 super.init();
64 }
65
66
67 @Override
68 protected AbstractBayeux newBayeux()
69 {
70 return new ContinuationBayeux();
71 }
72
73
74 @Override
75 protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
76 {
77 if (response.isCommitted())
78 {
79
80
81
82 return;
83 }
84
85
86 Object clientObj=request.getAttribute(CLIENT_ATTR);
87 Transport transport=null;
88 int received=-1;
89 boolean metaConnectDeliveryOnly=false;
90 boolean pendingResponse=false;
91 boolean metaConnect=false;
92 final boolean initial;
93
94
95 ContinuationClient client=(clientObj instanceof ClientImpl)?(ContinuationClient)clientObj:null;
96 if (client != null)
97 {
98 initial=false;
99
100 transport=(Transport)request.getAttribute(TRANSPORT_ATTR);
101 transport.setResponse(response);
102 metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
103 metaConnect=true;
104 }
105 else
106 {
107 initial=true;
108 Message[] messages=getMessages(request);
109 received=messages.length;
110
111
112 String jsonpParam=request.getParameter("jsonp");
113
114
115 try
116 {
117 for (Message message : messages)
118 {
119 if (jsonpParam != null)
120 message.put("jsonp",jsonpParam);
121
122 if (client == null)
123 {
124 String clientId = (String)message.get(AbstractBayeux.CLIENT_FIELD);
125 client=(ContinuationClient)_bayeux.getClient(clientId);
126
127
128
129 if (client == null)
130 {
131
132 String browser_id=findBrowserId(request);
133 if (browser_id == null)
134 browser_id=setBrowserId(request,response);
135
136 if (transport == null)
137 {
138 transport=_bayeux.newTransport(client,message);
139 transport.setResponse(response);
140 metaConnectDeliveryOnly=transport.isMetaConnectDeliveryOnly();
141 }
142 _bayeux.handle(null,transport,message);
143 message=null;
144 continue;
145 }
146 }
147
148 String browser_id=findBrowserId(request);
149 if (browser_id != null && (client.getBrowserId() == null || !client.getBrowserId().equals(browser_id)))
150 client.setBrowserId(browser_id);
151
152
153 if (transport == null)
154 {
155 transport=_bayeux.newTransport(client,message);
156 transport.setResponse(response);
157 metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
158 }
159
160
161
162 if (!metaConnectDeliveryOnly && !pendingResponse)
163 {
164 pendingResponse=true;
165 client.responsePending();
166 }
167
168 if (Bayeux.META_CONNECT.equals(message.getChannel()))
169 metaConnect=true;
170
171 _bayeux.handle(client,transport,message);
172 }
173 }
174 finally
175 {
176 for (Message message : messages)
177 ((MessageImpl)message).decRef();
178 if (pendingResponse)
179 {
180 client.responded();
181 }
182 }
183 }
184
185 Message metaConnectReply=null;
186
187
188 if (transport != null)
189 {
190 metaConnectReply=transport.getMetaConnectReply();
191 if (metaConnectReply != null)
192 {
193 long timeout=client.getTimeout();
194 if (timeout < 0)
195 timeout=_bayeux.getTimeout();
196
197 Continuation continuation=ContinuationSupport.getContinuation(request,client);
198
199
200 synchronized(client)
201 {
202 if (timeout > 0 && !client.hasNonLazyMessages() && initial && received <= 1)
203 {
204
205 request.setAttribute(CLIENT_ATTR,client);
206 request.setAttribute(TRANSPORT_ATTR,transport);
207 client.setContinuation(continuation);
208 continuation.setObject(response);
209 continuation.suspend(timeout);
210 }
211
212 continuation.reset();
213 }
214
215 client.setContinuation(null);
216 transport.setMetaConnectReply(null);
217 }
218 else if (client != null)
219 {
220 client.access();
221 }
222 }
223
224 if (client != null)
225 {
226 if (metaConnectDeliveryOnly && !metaConnect)
227 {
228
229 client.resume();
230 }
231 else
232 {
233 List<Message> messages;
234 synchronized (client)
235 {
236 client.doDeliverListeners();
237
238 ArrayQueue<Message> clientMessages = (ArrayQueue<Message>)client.getQueue();
239
240 messages = new ArrayList<Message>(clientMessages);
241
242 clientMessages.clear();
243 }
244
245 final int size=messages.size();
246 for (int i=0; i < size; i++)
247 {
248 final Message message=messages.get(i);
249 final MessageImpl mesgImpl=(message instanceof MessageImpl)?(MessageImpl)message:null;
250
251
252 if (i == 0 && size == 1 && mesgImpl != null && _refsThreshold > 0 && metaConnectReply != null && transport instanceof JSONTransport)
253 {
254
255 ByteBuffer buffer=mesgImpl.getBuffer();
256 if (buffer != null)
257 {
258
259 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
260 if (metaConnectReply instanceof MessageImpl)
261 ((MessageImpl)metaConnectReply).decRef();
262 metaConnectReply=null;
263 transport=null;
264 mesgImpl.decRef();
265 continue;
266 }
267 else if (mesgImpl.getRefs() >= _refsThreshold)
268 {
269
270 byte[] contentBytes=("[" + mesgImpl.getJSON() + ",{\"" + Bayeux.SUCCESSFUL_FIELD + "\":true,\"" + Bayeux.CHANNEL_FIELD
271 + "\":\"" + Bayeux.META_CONNECT + "\"}]").getBytes(StringUtil.__UTF8);
272 int contentLength=contentBytes.length;
273
274 String headerString="HTTP/1.1 200 OK\r\n" + "Content-Type: text/json; charset=utf-8\r\n" + "Content-Length: "
275 + contentLength + "\r\n" + "\r\n";
276
277 byte[] headerBytes=headerString.getBytes(StringUtil.__UTF8);
278
279 buffer=ByteBuffer.allocateDirect(headerBytes.length + contentLength);
280 buffer.put(headerBytes);
281 buffer.put(contentBytes);
282 buffer.flip();
283
284 mesgImpl.setBuffer(buffer);
285 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
286 metaConnectReply=null;
287 if (metaConnectReply instanceof MessageImpl)
288 ((MessageImpl)metaConnectReply).decRef();
289 transport=null;
290 mesgImpl.decRef();
291 continue;
292 }
293 }
294
295 if (message != null)
296 transport.send(message);
297 if (mesgImpl != null)
298 mesgImpl.decRef();
299 }
300
301 if (metaConnectReply != null)
302 {
303 metaConnectReply=_bayeux.extendSendMeta(client,metaConnectReply);
304 transport.send(metaConnectReply);
305 if (metaConnectReply instanceof MessageImpl)
306 ((MessageImpl)metaConnectReply).decRef();
307 }
308 }
309 }
310
311 if (transport != null)
312 transport.complete();
313 }
314
315 public void destroy()
316 {
317 ContinuationBayeux bayeux = (ContinuationBayeux)_bayeux;
318 if (bayeux != null)
319 {
320 for (Client c : bayeux.getClients())
321 {
322 if (c instanceof ContinuationClient)
323 {
324 ContinuationClient client = (ContinuationClient)c;
325 client.setContinuation(null);
326 }
327 }
328 bayeux.destroy();
329 }
330 }
331 }