View Javadoc

1   //========================================================================
2   //Copyright 2007 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd.continuation;
16  
17  import java.io.IOException;
18  import java.nio.ByteBuffer;
19  import java.util.ArrayList;
20  import java.util.List;
21  import javax.servlet.ServletException;
22  import javax.servlet.http.HttpServletRequest;
23  import javax.servlet.http.HttpServletResponse;
24  
25  import org.cometd.Bayeux;
26  import org.cometd.Client;
27  import org.cometd.Message;
28  import org.mortbay.cometd.AbstractBayeux;
29  import org.mortbay.cometd.AbstractCometdServlet;
30  import org.mortbay.cometd.ClientImpl;
31  import org.mortbay.cometd.JSONTransport;
32  import org.mortbay.cometd.MessageImpl;
33  import org.mortbay.cometd.Transport;
34  import org.mortbay.util.ArrayQueue;
35  import org.mortbay.util.StringUtil;
36  import org.mortbay.util.ajax.Continuation;
37  import org.mortbay.util.ajax.ContinuationSupport;
38  
39  public class ContinuationCometdServlet extends AbstractCometdServlet
40  {
41      public final static int __DEFAULT_REFS_THRESHOLD=0;
42      protected int _refsThreshold=__DEFAULT_REFS_THRESHOLD;
43      String _responseBuffer;
44  
45      @Override
46      public void init() throws ServletException
47      {
48          String refsThreshold=getInitParameter("refsThreshold");
49          if (refsThreshold != null)
50              _refsThreshold=Integer.parseInt(refsThreshold);
51  
52          if (_refsThreshold>0)
53          {
54              String server = getServletContext().getServerInfo();
55              if (server.startsWith("jetty/6"))
56                  _responseBuffer="org.mortbay.jetty.ResponseBuffer";
57              else if (server.startsWith("jetty/"))
58                  _responseBuffer="org.eclipse.jetty.server.ResponseBuffer";
59              else
60                  _refsThreshold=0;
61          }
62  
63          super.init();
64      }
65  
66      /* ------------------------------------------------------------ */
67      @Override
68      protected AbstractBayeux newBayeux()
69      {
70          return new ContinuationBayeux();
71      }
72  
73      /* ------------------------------------------------------------ */
74      @Override
75      protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
76      {
77          if (response.isCommitted())
78          {
79              // return here as this is a resumed request that has already had a
80              // response sent - this is probably a sendError();resume(); used instead
81              // of a complete().
82              return;
83          }
84  
85          // Look for an existing client and protect from context restarts
86          Object clientObj=request.getAttribute(CLIENT_ATTR);
87          Transport transport=null;
88          int received=-1;
89          boolean metaConnectDeliveryOnly=false;
90          boolean pendingResponse=false;
91          boolean metaConnect=false;
92          final boolean initial;
93  
94          // Have we seen this request before
95          ContinuationClient client=(clientObj instanceof ClientImpl)?(ContinuationClient)clientObj:null;
96          if (client != null)
97          {
98              initial=false;
99              // yes - extract saved properties
100             transport=(Transport)request.getAttribute(TRANSPORT_ATTR);
101             transport.setResponse(response);
102             metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
103             metaConnect=true;
104         }
105         else
106         {
107             initial=true;
108             Message[] messages=getMessages(request);
109             received=messages.length;
110 
111             /* check jsonp parameter */
112             String jsonpParam=request.getParameter("jsonp");
113 
114             // Handle all received messages
115             try
116             {
117                 for (Message message : messages)
118                 {
119                     if (jsonpParam != null)
120                         message.put("jsonp",jsonpParam);
121 
122                     if (client == null)
123                     {
124                         String clientId = (String)message.get(AbstractBayeux.CLIENT_FIELD);
125                         client=(ContinuationClient)_bayeux.getClient(clientId);
126 
127                         // If no client, SHOULD be a handshake, so force a
128                         // transport and handle
129                         if (client == null)
130                         {
131                             // Setup a browser ID
132                             String browser_id=findBrowserId(request);
133                             if (browser_id == null)
134                                 browser_id=setBrowserId(request,response);
135 
136                             if (transport == null)
137                             {
138                                 transport=_bayeux.newTransport(client,message);
139                                 transport.setResponse(response);
140                                 metaConnectDeliveryOnly=transport.isMetaConnectDeliveryOnly();
141                             }
142                             _bayeux.handle(null,transport,message);
143                             message=null;
144                             continue;
145                         }
146                     }
147 
148                     String browser_id=findBrowserId(request);
149                     if (browser_id != null && (client.getBrowserId() == null || !client.getBrowserId().equals(browser_id)))
150                         client.setBrowserId(browser_id);
151 
152                     // resolve transport
153                     if (transport == null)
154                     {
155                         transport=_bayeux.newTransport(client,message);
156                         transport.setResponse(response);
157                         metaConnectDeliveryOnly=client.isMetaConnectDeliveryOnly() || transport.isMetaConnectDeliveryOnly();
158                     }
159 
160                     // Tell client to hold messages as a response is likely to
161                     // be sent.
162                     if (!metaConnectDeliveryOnly && !pendingResponse)
163                     {
164                         pendingResponse=true;
165                         client.responsePending();
166                     }
167 
168                     if (Bayeux.META_CONNECT.equals(message.getChannel()))
169                         metaConnect=true;
170 
171                     _bayeux.handle(client,transport,message);
172                 }
173             }
174             finally
175             {
176                 for (Message message : messages)
177                     ((MessageImpl)message).decRef();
178                 if (pendingResponse)
179                 {
180                     client.responded();
181                 }
182             }
183         }
184 
185         Message metaConnectReply=null;
186 
187         // Do we need to wait for messages
188         if (transport != null)
189         {
190             metaConnectReply=transport.getMetaConnectReply();
191             if (metaConnectReply != null)
192             {
193                 long timeout=client.getTimeout();
194                 if (timeout < 0)
195                     timeout=_bayeux.getTimeout();
196 
197                 Continuation continuation=ContinuationSupport.getContinuation(request,client);
198 
199                 // Get messages or wait
200                 synchronized(client)
201                 {
202                     if (timeout > 0 && !client.hasNonLazyMessages() && initial && received <= 1)
203                     {
204                         // save state and suspend
205                         request.setAttribute(CLIENT_ATTR,client);
206                         request.setAttribute(TRANSPORT_ATTR,transport);
207                         client.setContinuation(continuation);
208                         continuation.setObject(response);
209                         continuation.suspend(timeout);
210                     }
211 
212                     continuation.reset();
213                 }
214 
215                 client.setContinuation(null);
216                 transport.setMetaConnectReply(null);
217             }
218             else if (client != null)
219             {
220                 client.access();
221             }
222         }
223 
224         if (client != null)
225         {
226             if (metaConnectDeliveryOnly && !metaConnect)
227             {
228                 // wake up any long poll
229                 client.resume();
230             }
231             else
232             {
233                 List<Message> messages;
234                 synchronized (client)
235                 {
236                     client.doDeliverListeners();
237 
238                     ArrayQueue<Message> clientMessages = (ArrayQueue<Message>)client.getQueue();
239                     // Copy the messages to avoid synchronization
240                     messages = new ArrayList<Message>(clientMessages);
241                     // Empty client's queue
242                     clientMessages.clear();
243                 }
244 
245                 final int size=messages.size();
246                 for (int i=0; i < size; i++)
247                 {
248                     final Message message=messages.get(i);
249                     final MessageImpl mesgImpl=(message instanceof MessageImpl)?(MessageImpl)message:null;
250 
251                     // Can we short cut the message?
252                     if (i == 0 && size == 1 && mesgImpl != null && _refsThreshold > 0 && metaConnectReply != null && transport instanceof JSONTransport)
253                     {
254                         // is there a response already prepared
255                         ByteBuffer buffer=mesgImpl.getBuffer();
256                         if (buffer != null)
257                         {
258                             // Send pre-prepared buffer
259                             request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
260                             if (metaConnectReply instanceof MessageImpl)
261                                 ((MessageImpl)metaConnectReply).decRef();
262                             metaConnectReply=null;
263                             transport=null;
264                             mesgImpl.decRef();
265                             continue;
266                         }
267                         else if (mesgImpl.getRefs() >= _refsThreshold)
268                         {
269                             // create multi-use buffer
270                             byte[] contentBytes=("[" + mesgImpl.getJSON() + ",{\"" + Bayeux.SUCCESSFUL_FIELD + "\":true,\"" + Bayeux.CHANNEL_FIELD
271                                     + "\":\"" + Bayeux.META_CONNECT + "\"}]").getBytes(StringUtil.__UTF8);
272                             int contentLength=contentBytes.length;
273 
274                             String headerString="HTTP/1.1 200 OK\r\n" + "Content-Type: text/json; charset=utf-8\r\n" + "Content-Length: "
275                                     + contentLength + "\r\n" + "\r\n";
276 
277                             byte[] headerBytes=headerString.getBytes(StringUtil.__UTF8);
278 
279                             buffer=ByteBuffer.allocateDirect(headerBytes.length + contentLength);
280                             buffer.put(headerBytes);
281                             buffer.put(contentBytes);
282                             buffer.flip();
283 
284                             mesgImpl.setBuffer(buffer);
285                             request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
286                             metaConnectReply=null;
287                             if (metaConnectReply instanceof MessageImpl)
288                                 ((MessageImpl)metaConnectReply).decRef();
289                             transport=null;
290                             mesgImpl.decRef();
291                             continue;
292                         }
293                     }
294 
295                     if (message != null)
296                         transport.send(message);
297                     if (mesgImpl != null)
298                         mesgImpl.decRef();
299                 }
300 
301                 if (metaConnectReply != null)
302                 {
303                     metaConnectReply=_bayeux.extendSendMeta(client,metaConnectReply);
304                     transport.send(metaConnectReply);
305                     if (metaConnectReply instanceof MessageImpl)
306                         ((MessageImpl)metaConnectReply).decRef();
307                 }
308             }
309         }
310 
311         if (transport != null)
312             transport.complete();
313     }
314 
315     public void destroy()
316     {
317         ContinuationBayeux bayeux = (ContinuationBayeux)_bayeux;
318         if (bayeux != null)
319         {
320             for (Client c : bayeux.getClients())
321             {
322                 if (c instanceof ContinuationClient)
323                 {
324                     ContinuationClient client = (ContinuationClient)c;
325                     client.setContinuation(null);
326                 }
327             }
328             bayeux.destroy();
329         }
330     }
331 }