View Javadoc

1   // ========================================================================
2   // Copyright 2006-20078 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd.client;
16  
17  import java.io.IOException;
18  import java.net.URLEncoder;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Queue;
24  import java.util.Timer;
25  import java.util.TimerTask;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  import javax.servlet.http.Cookie;
29  
30  import org.cometd.Bayeux;
31  import org.cometd.Client;
32  import org.cometd.ClientListener;
33  import org.cometd.Listener;
34  import org.cometd.Message;
35  import org.cometd.MessageListener;
36  import org.cometd.RemoveListener;
37  import org.mortbay.cometd.MessageImpl;
38  import org.mortbay.cometd.MessagePool;
39  import org.mortbay.io.Buffer;
40  import org.mortbay.io.ByteArrayBuffer;
41  import org.mortbay.jetty.HttpHeaders;
42  import org.mortbay.jetty.HttpSchemes;
43  import org.mortbay.jetty.client.Address;
44  import org.mortbay.jetty.client.HttpClient;
45  import org.mortbay.jetty.client.HttpExchange;
46  import org.mortbay.log.Log;
47  import org.mortbay.util.ArrayQueue;
48  import org.mortbay.util.QuotedStringTokenizer;
49  import org.mortbay.util.ajax.JSON;
50  
51  
52  /* ------------------------------------------------------------ */
53  /** Bayeux protocol Client.
54   * <p>
55   * Implements a Bayeux Ajax Push client as part of the cometd project.
56   *
57   * @see http://cometd.com
58   * @author gregw
59   *
60   */
61  public class BayeuxClient extends MessagePool implements Client
62  {
63      private HttpClient _client;
64      private Address _address;
65      private HttpExchange _pull;
66      private HttpExchange _push;
67      private String _uri="/cometd";
68      private boolean _initialized=false;
69      private boolean _disconnecting=false;
70      private String _clientId;
71      private Listener _listener;
72      private List<RemoveListener> _rListeners;
73      private List<MessageListener> _mListeners;
74      private Queue<Message> _inQ;  // queue of incoming messages used if no listener available. Used as the lock object for all incoming operations.
75      private Queue<Message> _outQ; // queue of outgoing messages. Used as the lock object for all outgoing operations.
76      private int _batch;
77      private boolean _formEncoded;
78      private Map<String, Cookie> _cookies=new ConcurrentHashMap<String, Cookie>();
79      private Advice _advice;
80      private Timer _timer;
81  
82      /* ------------------------------------------------------------ */
83      public BayeuxClient(HttpClient client, Address address, String uri, Timer timer) throws IOException
84      {
85          _client=client;
86          _address=address;
87          _uri=uri;
88  
89          _inQ=new ArrayQueue<Message>();
90          _outQ=new ArrayQueue<Message>();
91          
92          _timer = timer;
93          if (_timer == null)
94              _timer = new Timer("DefaultBayeuxClientTimer", true);
95      }
96      
97      public BayeuxClient(HttpClient client, Address address, String uri) throws IOException
98      {
99          this (client, address, uri, new Timer("DefaultBayeuxClientTimer", true));
100     }
101 
102     /* ------------------------------------------------------------ */
103     /* (non-Javadoc)
104      * Returns the clientId
105      * @see dojox.cometd.Client#getId()
106      */
107     public String getId()
108     {
109         return _clientId;
110     }
111 
112     /* ------------------------------------------------------------ */
113     public void start()
114     {
115         synchronized (_outQ)
116         {
117             if (!_initialized && _pull==null)
118                 _pull=new Handshake();
119         }
120     }
121 
122     /* ------------------------------------------------------------ */
123     public boolean isPolling()
124     {
125         synchronized (_outQ)
126         {
127             return _pull!=null;
128         }
129     }
130 
131     /* ------------------------------------------------------------ */
132     /** (non-Javadoc)
133      * @deprecated use {@link #deliver(Client, String, Object, String)}
134      * @see org.cometd.Client#deliver(org.cometd.Client, java.util.Map)
135      */
136     public void deliver(Client from, Message message)
137     {
138         synchronized (_inQ)
139         {
140             if (_mListeners==null)
141                 _inQ.add(message);
142             else
143             {
144                 for (MessageListener l : _mListeners)
145                     l.deliver(from,this,message);
146             }
147         }
148     }
149 
150     /* ------------------------------------------------------------ */
151     /* (non-Javadoc)
152      * @see dojox.cometd.Client#deliver(dojox.cometd.Client, java.lang.String, java.lang.Object, java.lang.String)
153      */
154     public void deliver(Client from, String toChannel, Object data, String id)
155     {
156         Message message = new MessageImpl();
157 
158         message.put(Bayeux.CHANNEL_FIELD,toChannel);
159         message.put(Bayeux.DATA_FIELD,data);
160         if (id!=null)
161             message.put(Bayeux.ID_FIELD,id);
162 
163         synchronized (_inQ)
164         {
165             if (_mListeners==null)
166                 _inQ.add(message);
167             else
168             {
169                 for (MessageListener l : _mListeners)
170                     l.deliver(from,this,message);
171             }
172         }
173     }
174 
175     /* ------------------------------------------------------------ */
176     /**
177      * @deprecated
178      */
179     public Listener getListener()
180     {
181         synchronized (_inQ)
182         {
183             return _listener;
184         }
185     }
186 
187     /* ------------------------------------------------------------ */
188     /* (non-Javadoc)
189      * @see dojox.cometd.Client#hasMessages()
190      */
191     public boolean hasMessages()
192     {
193         synchronized (_inQ)
194         {
195             return _inQ.size()>0;
196         }
197     }
198 
199     /* ------------------------------------------------------------ */
200     /* (non-Javadoc)
201      * @see dojox.cometd.Client#isLocal()
202      */
203     public boolean isLocal()
204     {
205         return false;
206     }
207 
208 
209     /* ------------------------------------------------------------ */
210     /* (non-Javadoc)
211      * @see dojox.cometd.Client#subscribe(java.lang.String)
212      */
213     private void publish(Message msg)
214     {
215         synchronized (_outQ)
216         {
217             _outQ.add(msg);
218 
219             if (_batch==0&&_initialized&&_push==null)
220                 _push=new Publish();
221         }
222     }
223 
224     /* ------------------------------------------------------------ */
225     /* (non-Javadoc)
226      * @see dojox.cometd.Client#publish(java.lang.String, java.lang.Object, java.lang.String)
227      */
228     public void publish(String toChannel, Object data, String msgId)
229     {
230         Message msg=new MessageImpl();
231         msg.put(Bayeux.CHANNEL_FIELD,toChannel);
232         msg.put(Bayeux.DATA_FIELD,data);
233         if (msgId!=null)
234             msg.put(Bayeux.ID_FIELD,msgId);
235         publish(msg);
236     }
237 
238     /* ------------------------------------------------------------ */
239     /* (non-Javadoc)
240      * @see dojox.cometd.Client#subscribe(java.lang.String)
241      */
242     public void subscribe(String toChannel)
243     {
244         Message msg=new MessageImpl();
245         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
246         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
247         publish(msg);
248     }
249 
250     /* ------------------------------------------------------------ */
251     /* (non-Javadoc)
252      * @see dojox.cometd.Client#unsubscribe(java.lang.String)
253      */
254     public void unsubscribe(String toChannel)
255     {
256         Message msg=new MessageImpl();
257         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
258         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
259         publish(msg);
260     }
261 
262     /* ------------------------------------------------------------ */
263     /* (non-Javadoc)
264      * @see dojox.cometd.Client#remove(boolean)
265      */
266     public void remove(boolean timeout)
267     {
268         Message msg=new MessageImpl();
269         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
270 
271         synchronized (_outQ)
272         {
273             _outQ.add(msg);
274 
275             _initialized=false;
276             _disconnecting=true;
277 
278             if (_batch==0&&_initialized&&_push==null)
279                 _push=new Publish();
280 
281         }
282     }
283 
284     /* ------------------------------------------------------------ */
285     /**
286      * @deprecated
287      */
288     public void setListener(Listener listener)
289     {
290         synchronized (_inQ)
291         {
292             if (_listener!=null)
293                 removeListener(_listener);
294             _listener=listener;
295             if (_listener!=null)
296                 addListener(_listener);
297         }
298     }
299 
300     /* ------------------------------------------------------------ */
301     /* (non-Javadoc)
302      * Removes all available messages from the inbound queue.
303      * If a listener is set then messages are not queued.
304      * @see dojox.cometd.Client#takeMessages()
305      */
306     public List<Message> takeMessages()
307     {
308         synchronized (_inQ)
309         {
310             LinkedList<Message> list=new LinkedList<Message>(_inQ);
311             _inQ.clear();
312             return list;
313         }
314     }
315 
316     /* ------------------------------------------------------------ */
317     /* (non-Javadoc)
318      * @see dojox.cometd.Client#endBatch()
319      */
320     public void endBatch()
321     {
322         synchronized (_outQ)
323         {
324             if (--_batch<=0)
325             {
326                 _batch=0;
327                 if ((_initialized||_disconnecting)&&_push==null&&_outQ.size()>0)
328                     _push=new Publish();
329             }
330         }
331     }
332 
333     /* ------------------------------------------------------------ */
334     /* (non-Javadoc)
335      * @see dojox.cometd.Client#startBatch()
336      */
337     public void startBatch()
338     {
339         synchronized (_outQ)
340         {
341             _batch++;
342         }
343     }
344 
345     /* ------------------------------------------------------------ */
346     /** Customize an Exchange.
347      * Called when an exchange is about to be sent to allow Cookies
348      * and Credentials to be customized.  Default implementation sets
349      * any cookies
350      */
351     protected void customize(HttpExchange exchange)
352     {
353         StringBuilder buf=null;
354         for (Cookie cookie : _cookies.values())
355         {
356 	    if (buf==null)
357 	        buf=new StringBuilder();
358             else
359 	        buf.append("; ");
360 	    buf.append(cookie.getName()); // TODO quotes
361 	    buf.append("=");
362 	    buf.append(cookie.getValue()); // TODO quotes
363         }
364 	if (buf!=null)
365             exchange.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
366     }
367 
368     /* ------------------------------------------------------------ */
369     public void setCookie(Cookie cookie)
370     {
371         _cookies.put(cookie.getName(),cookie);
372     }
373 
374     /* ------------------------------------------------------------ */
375     /** The base class for all bayeux exchanges.
376      */
377     private class Exchange extends HttpExchange.ContentExchange
378     {
379         Object[] _responses;
380         int _connectFailures;
381 
382         Exchange(String info)
383         {
384             setMethod("POST");
385             setScheme(HttpSchemes.HTTP_BUFFER);
386             setAddress(_address);
387             setURI(_uri+"/"+info);
388 
389             setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
390         }
391 
392         protected void setMessage(String message)
393         {
394             try
395             {
396                 if (_formEncoded)
397                     setRequestContent(new ByteArrayBuffer("message="+URLEncoder.encode(message,"utf-8")));
398                 else
399                     setRequestContent(new ByteArrayBuffer(message,"utf-8"));
400             }
401             catch (Exception e)
402             {
403                 Log.warn(e);
404             }
405         }
406 
407         protected void setMessages(Queue<Message> messages)
408         {
409             try
410             {
411                 for (Message msg : messages)
412                 {
413                     msg.put(Bayeux.CLIENT_FIELD,_clientId);
414                 }
415                 String json=JSON.toString(messages);
416 
417                 if (_formEncoded)
418                     setRequestContent(new ByteArrayBuffer("message="+URLEncoder.encode(json,"utf-8")));
419                 else
420                     setRequestContent(new ByteArrayBuffer(json,"utf-8"));
421 
422             }
423             catch (Exception e)
424             {
425                 Log.warn(e);
426             }
427 
428         }
429 
430         /* ------------------------------------------------------------ */
431         protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
432         {
433             super.onResponseStatus(version,status,reason);
434         }
435 
436         /* ------------------------------------------------------------ */
437         protected void onResponseHeader(Buffer name, Buffer value) throws IOException
438         {
439             super.onResponseHeader(name,value);
440             if (HttpHeaders.CACHE.getOrdinal(name)==HttpHeaders.SET_COOKIE_ORDINAL)
441             {
442                 String cname=null;
443                 String cvalue=null;
444 
445                 QuotedStringTokenizer tok=new QuotedStringTokenizer(value.toString(),"=;",false,false);
446                 tok.setSingle(false);
447 
448                 if (tok.hasMoreElements())
449                     cname=tok.nextToken();
450                 if (tok.hasMoreElements())
451                     cvalue=tok.nextToken();
452 
453                 Cookie cookie=new Cookie(cname,cvalue);
454 
455                 while (tok.hasMoreTokens())
456                 {
457                     String token=tok.nextToken();
458                     if ("Version".equalsIgnoreCase(token))
459                         cookie.setVersion(Integer.parseInt(tok.nextToken()));
460                     else if ("Comment".equalsIgnoreCase(token))
461                         cookie.setComment(tok.nextToken());
462                     else if ("Path".equalsIgnoreCase(token))
463                         cookie.setPath(tok.nextToken());
464                     else if ("Domain".equalsIgnoreCase(token))
465                         cookie.setDomain(tok.nextToken());
466                     else if ("Expires".equalsIgnoreCase(token))
467                     {
468                         tok.nextToken();
469                         // TODO
470                     }
471                     else if ("Max-Age".equalsIgnoreCase(token))
472                     {
473                         tok.nextToken();
474                         // TODO
475                     }
476                     else if ("Secure".equalsIgnoreCase(token))
477                         cookie.setSecure(true);
478                 }
479 
480                 BayeuxClient.this.setCookie(cookie);
481             }
482         }
483 
484         /* ------------------------------------------------------------ */
485         protected void onResponseComplete() throws IOException
486         {
487             super.onResponseComplete();
488 
489             if (getResponseStatus()==200)
490             {
491                 String content = getResponseContent();
492                 if (content==null || content.length()==0)
493                     throw new IllegalStateException();
494                 _responses=parse(content);
495             }
496         }
497 
498         /* ------------------------------------------------------------ */
499         protected void onExpire()
500         {
501             super.onExpire();
502         }
503 
504         /* ------------------------------------------------------------ */
505         protected void onConnectionFailed(Throwable ex)
506         {
507             super.onConnectionFailed(ex);
508             if (++_connectFailures<5)
509             {
510                 try
511                 {
512                     _client.send(this);
513                 }
514                 catch (IOException e)
515                 {
516                     Log.warn(e);
517                 }
518             }
519         }
520 
521         /* ------------------------------------------------------------ */
522         protected void onException(Throwable ex)
523         {
524             super.onException(ex);
525         }
526 
527     }
528 
529     /* ------------------------------------------------------------ */
530     /** The Bayeux handshake exchange.
531      * Negotiates a client Id and initializes the protocol.
532      *
533      */
534     private class Handshake extends Exchange
535     {
536         final static String __HANDSHAKE="[{"+"\"channel\":\"/meta/handshake\","+"\"version\":\"0.9\","+"\"minimumVersion\":\"0.9\""+"}]";
537 
538         Handshake()
539         {
540             super("handshake");
541             setMessage(__HANDSHAKE);
542 
543             try
544             {
545                 customize(this);
546                 _client.send(this);
547             }
548             catch (IOException e)
549             {
550                 Log.warn(e);
551             }
552         }
553 
554         /* ------------------------------------------------------------ */
555         /* (non-Javadoc)
556          * @see org.mortbay.jetty.client.HttpExchange#onException(java.lang.Throwable)
557          */
558         protected void onException(Throwable ex)
559         {
560             Log.warn("Handshake:"+ex);
561             Log.debug(ex);
562         }
563 
564         /* ------------------------------------------------------------ */
565         /* (non-Javadoc)
566          * @see org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
567          */
568         protected void onResponseComplete() throws IOException
569         {
570             super.onResponseComplete();
571             if (getResponseStatus()==200&&_responses!=null&&_responses.length>0)
572             {
573                 Map<?,?> response=(Map<?,?>)_responses[0];
574                 Boolean successful=(Boolean)response.get(Bayeux.SUCCESSFUL_FIELD);
575                 if (successful!=null&&successful.booleanValue())
576                 {
577                     _clientId=(String)response.get(Bayeux.CLIENT_FIELD);
578                     _pull=new Connect();
579                 }
580                 else
581                     throw new IOException("Handshake failed:"+_responses[0]);
582             }
583             else
584             {
585                 throw new IOException("Handshake failed: "+getResponseStatus());
586             }
587         }
588     }
589 
590     /* ------------------------------------------------------------ */
591     /** The Bayeux Connect exchange.
592      * Connect exchanges implement the long poll for Bayeux.
593      */
594     private class Connect extends Exchange
595     {
596         Connect()
597         {
598             super("connect");
599             String connect="{"+"\"channel\":\"/meta/connect\","+"\"clientId\":\""+_clientId+"\","+"\"connectionType\":\"long-polling\""+"}";
600             setMessage(connect);
601 
602             try
603             {
604                 customize(this);
605                 _client.send(this);
606             }
607             catch (IOException e)
608             {
609                 Log.warn(e);
610             }
611         }
612 
613         protected void onResponseComplete() throws IOException
614         {
615             super.onResponseComplete();
616             if (getResponseStatus()==200&&_responses!=null&&_responses.length>0)
617             {
618                 try
619                 {
620                     startBatch();
621 
622                     for (int i=0; i<_responses.length; i++)
623                     {
624                         Message msg=(Message)_responses[i];
625 
626                         if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
627                         {
628                             Boolean successful=(Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
629                             if (successful!=null&&successful.booleanValue())
630                             {
631                                 if (!_initialized)
632                                 {
633                                     _initialized=true;
634                                     synchronized (_outQ)
635                                     {
636                                         if (_outQ.size()>0)
637                                             _push=new Publish();
638                                     }
639                                 }
640 
641                                 Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
642                                 if (adviceField != null)
643                                     _advice = new Advice(adviceField);
644                                 
645                                 //if interval in advice, set up callback to expire at the interval value
646                                 //else  just send the connect
647                                 if (_advice != null && _advice.getInterval() > 0)
648                                 {
649                                     TimerTask task = new TimerTask()
650                                     {
651                                         public void run()
652                                         {
653                                             _pull=new Connect();
654                                         }
655                                     };
656                                     _timer.schedule(task, _advice.getInterval());
657                                 }
658                                 else
659                                     _pull=new Connect();
660                             }
661                             else
662                                 throw new IOException("Connect failed:"+_responses[0]);
663                         }
664 
665                         deliver(null,msg);
666                     }
667                 }
668                 finally
669                 {
670                     endBatch();
671                 }
672 
673             }
674             else
675             {
676                 throw new IOException("Connect failed: "+getResponseStatus());
677             }
678         }
679     }
680 
681     /* ------------------------------------------------------------ */
682     /**
683      * Publish message exchange.
684      * Sends messages to bayeux server and handles any messages received as a result.
685      */
686     private class Publish extends Exchange
687     {
688         Publish()
689         {
690             super("publish");
691             synchronized (_outQ)
692             {
693                 if (_outQ.size()==0)
694                     return;
695                 setMessages(_outQ);
696                 _outQ.clear();
697             }
698             try
699             {
700                 customize(this);
701                 _client.send(this);
702             }
703             catch (IOException e)
704             {
705                 Log.warn(e);
706             }
707         }
708 
709         /* ------------------------------------------------------------ */
710         /* (non-Javadoc)
711          * @see org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
712          */
713         protected void onResponseComplete() throws IOException
714         {
715             super.onResponseComplete();
716 
717             try
718             {
719                 synchronized (_outQ)
720                 {
721                     startBatch();
722                     _push=null;
723                 }
724 
725                 if (getResponseStatus()==200&&_responses!=null&&_responses.length>0)
726                 {
727 
728                     for (int i=0; i<_responses.length; i++)
729                     {
730                         Message msg=(Message)_responses[i];
731                         deliver(null,msg);
732                     }
733                 }
734                 else
735                 {
736                     throw new IOException("Reconnect failed: "+getResponseStatus());
737                 }
738             }
739             finally
740             {
741                 endBatch();
742             }
743         }
744     }
745 
746     public void addListener(ClientListener listener)
747     {
748         synchronized(_inQ)
749         {
750             if (listener instanceof MessageListener)
751             {
752                 if (_mListeners==null)
753                     _mListeners=new ArrayList<MessageListener>();
754                 _mListeners.add((MessageListener)listener);
755             }
756             if (listener instanceof RemoveListener)
757             {
758                 if (_rListeners==null)
759                     _rListeners=new ArrayList<RemoveListener>();
760                 _rListeners.add((RemoveListener)listener);
761             }
762         }
763     }
764 
765     public void removeListener(ClientListener listener)
766     {
767         synchronized(_inQ)
768         {
769             if (listener instanceof MessageListener)
770             {
771                 if (_mListeners!=null)
772                     _mListeners.remove((MessageListener)listener);
773             }
774             if (listener instanceof RemoveListener)
775             {
776                 if (_rListeners!=null)
777                     _rListeners.remove((RemoveListener)listener);
778             }
779         }
780     }
781 
782     public int getMaxQueue()
783     {
784         return -1;
785     }
786 
787     public Queue<Message> getQueue()
788     {
789         return _inQ;
790     }
791 
792     public void setMaxQueue(int max)
793     {
794         if( max!=-1)
795             throw new UnsupportedOperationException();
796     }
797 }