1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.continuation;
16
17 import java.io.IOException;
18 import java.nio.ByteBuffer;
19
20 import javax.servlet.ServletException;
21 import javax.servlet.http.HttpServletRequest;
22 import javax.servlet.http.HttpServletResponse;
23
24 import org.cometd.Bayeux;
25 import org.cometd.Extension;
26 import org.cometd.Message;
27 import org.mortbay.cometd.AbstractBayeux;
28 import org.mortbay.cometd.AbstractCometdServlet;
29 import org.mortbay.cometd.ClientImpl;
30 import org.mortbay.cometd.JSONTransport;
31 import org.mortbay.cometd.MessageImpl;
32 import org.mortbay.cometd.Transport;
33 import org.mortbay.util.ArrayQueue;
34 import org.mortbay.util.StringUtil;
35 import org.mortbay.util.ajax.Continuation;
36 import org.mortbay.util.ajax.ContinuationSupport;
37
38
39 public class ContinuationCometdServlet extends AbstractCometdServlet
40 {
41
42 @Override
43 protected AbstractBayeux newBayeux()
44 {
45 return new ContinuationBayeux();
46 }
47
48
49 @Override
50 protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
51 {
52
53 Object clientObj=request.getAttribute(CLIENT_ATTR);
54 ContinuationClient client=(clientObj instanceof ClientImpl)?(ContinuationClient)clientObj:null;
55 Transport transport=null;
56 boolean connect=false;
57 int received=-1;
58
59
60 if (client!=null)
61 {
62
63 transport=(Transport)request.getAttribute(TRANSPORT_ATTR);
64 transport.setResponse(response);
65 }
66 else
67 {
68 Message[] messages = getMessages(request);
69 received=messages.length;
70
71
72 String jsonpParam=request.getParameter("jsonp");
73
74
75 try
76 {
77 for (Message message : messages)
78 {
79 if (jsonpParam!=null)
80 message.put("jsonp",jsonpParam);
81
82 if (client==null)
83 {
84 client=(ContinuationClient)_bayeux.getClient((String)message.get(AbstractBayeux.CLIENT_FIELD));
85
86
87 if (client==null)
88 {
89
90 String browser_id=browserId(request);
91 if (browser_id==null)
92 browser_id=newBrowserId(request,response);
93
94 if (transport==null)
95 {
96 transport=_bayeux.newTransport(client,message);
97 transport.setResponse(response);
98 }
99 _bayeux.handle(null,transport,message);
100 message=null;
101
102 continue;
103 }
104 else
105 {
106 String browser_id=browserId(request);
107 if (browser_id!=null && (client.getBrowserId()==null || !client.getBrowserId().equals(browser_id)))
108 client.setBrowserId(browser_id);
109
110
111 if (transport==null)
112 {
113 transport=_bayeux.newTransport(client,message);
114 transport.setResponse(response);
115 }
116
117
118 if (!transport.resumePoll())
119 client.responsePending();
120 }
121 }
122
123 String channel=_bayeux.handle(client,transport,message);
124 connect|=AbstractBayeux.META_CONNECT.equals(channel);
125 }
126 }
127 finally
128 {
129 if (transport!=null && client!=null && !transport.resumePoll())
130 client.responded();
131
132 for (Message message : messages)
133 ((MessageImpl)message).decRef();
134 }
135 }
136
137 Message pollReply=null;
138
139 if (transport!=null)
140 {
141 pollReply=transport.getPollReply();
142 if (pollReply!=null)
143 {
144 if (_bayeux.isLogDebug())
145 _bayeux.logDebug("doPost: transport is polling");
146 long timeout=client.getTimeout();
147 if (timeout==0)
148 timeout=_bayeux.getTimeout();
149
150 Continuation continuation=ContinuationSupport.getContinuation(request,client);
151
152
153 synchronized (client)
154 {
155 if (!client.hasMessages() && !continuation.isPending()&& received<=1)
156 {
157
158 ((ContinuationClient)client).setContinuation(continuation);
159 request.setAttribute(CLIENT_ATTR,client);
160 request.setAttribute(TRANSPORT_ATTR,transport);
161 continuation.suspend(timeout);
162 }
163
164 if (!continuation.isPending())
165 client.access();
166
167 continuation.reset();
168 }
169
170 ((ContinuationClient)client).setContinuation(null);
171 transport.setPollReply(null);
172
173 for (Extension e:_bayeux.getExtensions())
174 pollReply=e.sendMeta(pollReply);
175 }
176 else if (client!=null)
177 {
178 client.access();
179 }
180 }
181
182
183 if (client!=null)
184 {
185 synchronized(client)
186 {
187 client.doDeliverListeners();
188 ArrayQueue<Message> messages= (ArrayQueue)client.getQueue();
189 int size=messages.size();
190 boolean flushed=false;
191
192 try
193 {
194 if (pollReply!=null)
195 {
196
197 if (_refsThreshold>0 && size==1 && transport instanceof JSONTransport)
198 {
199 MessageImpl message = (MessageImpl)messages.peek();
200
201
202 ByteBuffer buffer = message.getBuffer();
203 if (buffer!=null)
204 {
205 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
206 ((MessageImpl)message).decRef();
207 flushed=true;
208 }
209 else if (message.getRefs()>=_refsThreshold)
210 {
211 byte[] contentBytes = ("[{\""+Bayeux.SUCCESSFUL_FIELD+"\":true,\""+
212 Bayeux.CHANNEL_FIELD+"\":\""+Bayeux.META_CONNECT+"\"},"+
213 message.getJSON()+"]").getBytes(StringUtil.__UTF8);
214 int contentLength = contentBytes.length;
215
216 String headerString = "HTTP/1.1 200 OK\r\n"+
217 "Content-Type: text/json; charset=utf-8\r\n" +
218 "Content-Length: " + contentLength + "\r\n" +
219 "\r\n";
220
221 byte[] headerBytes = headerString.getBytes(StringUtil.__UTF8);
222
223 buffer = ByteBuffer.allocateDirect(headerBytes.length+contentLength);
224 buffer.put(headerBytes);
225 buffer.put(contentBytes);
226 buffer.flip();
227
228 message.setBuffer(buffer);
229 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
230 ((MessageImpl)message).decRef();
231 flushed=true;
232 }
233 else
234 transport.send(pollReply);
235 }
236 else
237 transport.send(pollReply);
238 }
239
240 if (!flushed)
241 {
242 Message message = null;
243 for (int i = 0;i<size;i++)
244 {
245 message=messages.getUnsafe(i);
246 transport.send(message);
247 }
248
249 transport.complete();
250 flushed=true;
251 }
252 }
253 finally
254 {
255 if (flushed)
256 messages.clear();
257 }
258 }
259
260 if (transport.resumePoll())
261 client.resume();
262 }
263 else if (transport!=null)
264 {
265 transport.complete();
266 }
267 }
268 }