View Javadoc

1   //========================================================================
2   //Copyright 2007 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd.continuation;
16  
17  import java.io.IOException;
18  import java.nio.ByteBuffer;
19  
20  import javax.servlet.ServletException;
21  import javax.servlet.http.HttpServletRequest;
22  import javax.servlet.http.HttpServletResponse;
23  
24  import org.cometd.Bayeux;
25  import org.cometd.Extension;
26  import org.cometd.Message;
27  import org.mortbay.cometd.AbstractBayeux;
28  import org.mortbay.cometd.AbstractCometdServlet;
29  import org.mortbay.cometd.ClientImpl;
30  import org.mortbay.cometd.JSONTransport;
31  import org.mortbay.cometd.MessageImpl;
32  import org.mortbay.cometd.Transport;
33  import org.mortbay.util.ArrayQueue;
34  import org.mortbay.util.StringUtil;
35  import org.mortbay.util.ajax.Continuation;
36  import org.mortbay.util.ajax.ContinuationSupport;
37  
38  
39  public class ContinuationCometdServlet extends AbstractCometdServlet
40  {
41      /* ------------------------------------------------------------ */
42      @Override
43      protected AbstractBayeux newBayeux()
44      {
45          return new ContinuationBayeux();
46      }
47  
48      /* ------------------------------------------------------------ */
49      @Override
50      protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
51      {
52          // Look for an existing client and protect from context restarts
53          Object clientObj=request.getAttribute(CLIENT_ATTR);
54          ContinuationClient client=(clientObj instanceof ClientImpl)?(ContinuationClient)clientObj:null;
55          Transport transport=null;
56          boolean connect=false;
57          int received=-1;
58          
59          // Have we seen this request before
60          if (client!=null)
61          {
62              // yes - extract saved properties
63              transport=(Transport)request.getAttribute(TRANSPORT_ATTR);
64              transport.setResponse(response);
65          }
66          else
67          {
68              Message[] messages = getMessages(request);
69              received=messages.length;
70  
71              /* check jsonp parameter */
72              String jsonpParam=request.getParameter("jsonp");
73  
74              // Handle all messages
75              try
76              {
77                  for (Message message : messages)
78                  {
79                      if (jsonpParam!=null)
80                          message.put("jsonp",jsonpParam);
81  
82                      if (client==null)
83                      {   
84                          client=(ContinuationClient)_bayeux.getClient((String)message.get(AbstractBayeux.CLIENT_FIELD));
85  
86                          // If no client,  SHOULD be a handshake, so force a transport and handle
87                          if (client==null)
88                          {
89                              // Setup a browser ID
90                              String browser_id=browserId(request);
91                              if (browser_id==null)
92                                  browser_id=newBrowserId(request,response);
93  
94                              if (transport==null)
95                              {
96                                  transport=_bayeux.newTransport(client,message);
97                                  transport.setResponse(response);
98                              }
99                              _bayeux.handle(null,transport,message);
100                             message=null;
101 
102                             continue;
103                         }
104                         else
105                         {
106                             String browser_id=browserId(request);
107                             if (browser_id!=null && (client.getBrowserId()==null || !client.getBrowserId().equals(browser_id)))
108                                 client.setBrowserId(browser_id);
109 
110                             // resolve transport
111                             if (transport==null)
112                             {
113                                 transport=_bayeux.newTransport(client,message);
114                                 transport.setResponse(response);
115                             }
116 
117                             // Tell client to hold messages as a response is likely to be sent.
118                             if (!transport.resumePoll())
119                                 client.responsePending();
120                         }
121                     }
122 
123                     String channel=_bayeux.handle(client,transport,message);
124                     connect|=AbstractBayeux.META_CONNECT.equals(channel);
125                 }
126             }
127             finally
128             {
129                 if (transport!=null && client!=null && !transport.resumePoll())
130                     client.responded();
131                 
132                 for (Message message : messages)
133                     ((MessageImpl)message).decRef();
134             }
135         }
136 
137         Message pollReply=null;
138         // Do we need to wait for messages
139         if (transport!=null)
140         {
141             pollReply=transport.getPollReply();
142             if (pollReply!=null)
143             {
144                 if (_bayeux.isLogDebug())
145                     _bayeux.logDebug("doPost: transport is polling");
146                 long timeout=client.getTimeout();
147                 if (timeout==0)
148                     timeout=_bayeux.getTimeout();
149 
150                 Continuation continuation=ContinuationSupport.getContinuation(request,client);
151 
152                 // Get messages or wait
153                 synchronized (client)
154                 {
155                     if (!client.hasMessages() && !continuation.isPending()&& received<=1)
156                     {
157                         // save state and suspend
158                         ((ContinuationClient)client).setContinuation(continuation);
159                         request.setAttribute(CLIENT_ATTR,client);
160                         request.setAttribute(TRANSPORT_ATTR,transport);
161                         continuation.suspend(timeout);
162                     }
163                     
164                     if (!continuation.isPending())
165                         client.access();
166 
167                     continuation.reset();
168                 }
169 
170                 ((ContinuationClient)client).setContinuation(null);
171                 transport.setPollReply(null);
172 
173                 for (Extension e:_bayeux.getExtensions())
174                     pollReply=e.sendMeta(pollReply);
175             }
176             else if (client!=null)
177             {
178                 client.access();
179             }
180         }
181 
182         // Send any messages.
183         if (client!=null) 
184         { 
185             synchronized(client)
186             {
187                 client.doDeliverListeners();
188                 ArrayQueue<Message> messages= (ArrayQueue)client.getQueue();
189                 int size=messages.size();
190                 boolean flushed=false;
191                 
192                 try
193                 {
194                     if (pollReply!=null)
195                     {
196                         // can we bypass response generation?
197                         if (_refsThreshold>0 && size==1 && transport instanceof JSONTransport)
198                         {
199                             MessageImpl message = (MessageImpl)messages.peek();
200                             
201                             // is there a response already prepared
202                             ByteBuffer buffer = message.getBuffer();
203                             if (buffer!=null)
204                             {
205                                 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
206                                 ((MessageImpl)message).decRef();
207                                 flushed=true;
208                             }
209                             else if (message.getRefs()>=_refsThreshold)
210                             {                                
211                                 byte[] contentBytes = ("[{\""+Bayeux.SUCCESSFUL_FIELD+"\":true,\""+
212                                         Bayeux.CHANNEL_FIELD+"\":\""+Bayeux.META_CONNECT+"\"},"+
213                                         message.getJSON()+"]").getBytes(StringUtil.__UTF8);
214                                 int contentLength = contentBytes.length;
215 
216                                 String headerString = "HTTP/1.1 200 OK\r\n"+
217                                 "Content-Type: text/json; charset=utf-8\r\n" +
218                                 "Content-Length: " + contentLength + "\r\n" +
219                                 "\r\n";
220 
221                                 byte[] headerBytes = headerString.getBytes(StringUtil.__UTF8);
222 
223                                 buffer = ByteBuffer.allocateDirect(headerBytes.length+contentLength);
224                                 buffer.put(headerBytes);
225                                 buffer.put(contentBytes);
226                                 buffer.flip();
227 
228                                 message.setBuffer(buffer);
229                                 request.setAttribute("org.mortbay.jetty.ResponseBuffer",buffer);
230                                 ((MessageImpl)message).decRef();
231                                 flushed=true;
232                             }
233                             else
234                                 transport.send(pollReply);
235                         }
236                         else
237                             transport.send(pollReply);                        
238                     }
239                     
240                     if (!flushed)
241                     {
242                         Message message = null;
243                         for (int i = 0;i<size;i++)
244                         {
245                             message=messages.getUnsafe(i);
246                             transport.send(message);
247                         }
248 
249                         transport.complete();
250                         flushed=true;
251                     }
252                 }
253                 finally
254                 {
255                     if (flushed)
256                         messages.clear();
257                 }
258             }
259             
260             if (transport.resumePoll())
261                 client.resume();
262         }
263         else if (transport!=null)
264         {
265             transport.complete();
266         }   
267     }
268 }