View Javadoc

1   // ========================================================================
2   // Copyright 2006-20078 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd.client;
16  
17  import java.io.IOException;
18  import java.net.URLEncoder;
19  import java.text.ParseException;
20  import java.text.SimpleDateFormat;
21  import java.util.ArrayList;
22  import java.util.Date;
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Queue;
27  import java.util.Timer;
28  import java.util.TimerTask;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.TimeUnit;
31  import javax.servlet.http.Cookie;
32  
33  import org.cometd.Bayeux;
34  import org.cometd.Client;
35  import org.cometd.ClientListener;
36  import org.cometd.Extension;
37  import org.cometd.Message;
38  import org.cometd.MessageListener;
39  import org.cometd.RemoveListener;
40  import org.mortbay.cometd.MessageImpl;
41  import org.mortbay.cometd.MessagePool;
42  import org.mortbay.component.AbstractLifeCycle;
43  import org.mortbay.io.Buffer;
44  import org.mortbay.io.ByteArrayBuffer;
45  import org.mortbay.jetty.HttpHeaders;
46  import org.mortbay.jetty.HttpSchemes;
47  import org.mortbay.jetty.HttpURI;
48  import org.mortbay.jetty.client.Address;
49  import org.mortbay.jetty.client.ContentExchange;
50  import org.mortbay.jetty.client.HttpClient;
51  import org.mortbay.jetty.client.HttpExchange;
52  import org.mortbay.log.Log;
53  import org.mortbay.util.ArrayQueue;
54  import org.mortbay.util.LazyList;
55  import org.mortbay.util.QuotedStringTokenizer;
56  import org.mortbay.util.ajax.JSON;
57  
58  /* ------------------------------------------------------------ */
59  /**
60   * Bayeux protocol Client.
61   * <p>
62   * Implements a Bayeux Ajax Push client as part of the cometd project.
63   * <p>
64   * The HttpClient attributes are used to share a Timer and MessagePool instance
65   * between all Bayeux clients sharing the same HttpClient.
66   *
67   * @see http://cometd.org
68   * @author gregw
69   *
70   */
71  public class BayeuxClient extends AbstractLifeCycle implements Client
72  {
73      private final static String __TIMER="org.mortbay.cometd.client.Timer";
74      private final static String __JSON="org.mortbay.cometd.client.JSON";
75      private final static String __MSGPOOL="org.mortbay.cometd.MessagePool";
76      protected HttpClient _httpClient;
77  
78      protected MessagePool _msgPool;
79      private ArrayQueue<Message> _inQ = new ArrayQueue<Message>();  // queue of incoming messages
80      private ArrayQueue<Message> _outQ = new ArrayQueue<Message>(); // queue of outgoing messages
81      protected Address _cometdAddress;
82      private Exchange _pull;
83      private Exchange _push;
84      private String _path = "/cometd";
85      private boolean _initialized = false;
86      private boolean _disconnecting = false;
87      private boolean _handshook = false;
88      private String _clientId;
89      private org.cometd.Listener _listener;
90      private List<RemoveListener> _rListeners;
91      private List<MessageListener> _mListeners;
92      private int _batch;
93      private boolean _formEncoded;
94      private Map<String, ExpirableCookie> _cookies = new ConcurrentHashMap<String, ExpirableCookie>();
95      private Advice _advice;
96      private Timer _timer;
97      private boolean _ownTimer;
98      private int _backoffInterval = 0;
99      private int _backoffIncrement = 1000;
100     private int _backoffMaxInterval = 60000;
101     private Buffer _scheme;
102     protected Extension[] _extensions;
103     protected JSON _jsonOut;
104 
105     /* ------------------------------------------------------------ */
106     public BayeuxClient(HttpClient client, String url)
107     {
108         this(client,url,null);
109     }
110 
111     /* ------------------------------------------------------------ */
112     public BayeuxClient(HttpClient client, String url, Timer timer)
113     {
114         HttpURI uri = new HttpURI(url);
115         _httpClient = client;
116         _cometdAddress = new Address(uri.getHost(),uri.getPort());
117         _path=uri.getPath();
118         _timer = timer;
119         _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER;
120     }
121 
122     /* ------------------------------------------------------------ */
123     public BayeuxClient(HttpClient client, Address address, String path, Timer timer)
124     {
125         _httpClient = client;
126         _cometdAddress = address;
127         _path = path;
128         _timer = timer;
129     }
130 
131     /* ------------------------------------------------------------ */
132     public BayeuxClient(HttpClient client, Address address, String uri)
133     {
134         this(client,address,uri,null);
135     }
136 
137     /* ------------------------------------------------------------ */
138     public void addExtension(Extension ext)
139     {
140         _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
141     }
142 
143     public void removeExtension(Extension extension)
144     {
145         _extensions = (Extension[])LazyList.removeFromArray(_extensions,extension);
146     }
147 
148     /* ------------------------------------------------------------ */
149     Extension[] getExtensions()
150     {
151         return _extensions;
152     }
153 
154     /* ------------------------------------------------------------ */
155     /**
156      * If unable to connect/handshake etc, even if following the interval in the
157      * advice, wait for this interval initially, and try again.
158      *
159      * @param interval
160      */
161     public void setBackOffInterval(int interval)
162     {
163         _backoffInterval = interval;
164     }
165 
166     /* ------------------------------------------------------------ */
167     /**
168      * @return the backoff interval to wait before retrying an unsuccessful
169      * or failed message
170      */
171     public int getBackoffInterval()
172     {
173         return _backoffInterval;
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * @deprecated We retry an infinite number of times.
179      * use {@link #getBackoffIncrement()} to set limits
180      */
181     public void setBackoffMaxRetries(int retries)
182     {
183     }
184 
185     /* ------------------------------------------------------------ */
186     /**
187      * @deprecated
188      */
189     public int getBackoffMaxRetries()
190     {
191         return -1;
192     }
193 
194     /* ------------------------------------------------------------ */
195     /**
196      . Each retry will increment by this
197      * intervel, until we reach _backoffMaxInterval
198      *
199     */
200     public void setBackoffIncrement(int interval)
201     {
202         _backoffIncrement = interval;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /**
207      * @return the backoff interval used to increase the backoff time when
208      * retrying an unsuccessful or failed message.
209      */
210     public int getBackoffIncrement()
211     {
212         return _backoffIncrement;
213     }
214 
215     /* ------------------------------------------------------------ */
216     public void setBackoffMaxInterval(int interval)
217     {
218         _backoffMaxInterval = interval;
219     }
220 
221     public int getBackoffMaxInterval()
222     {
223         return _backoffMaxInterval;
224     }
225 
226     /* ------------------------------------------------------------ */
227     /*
228      * (non-Javadoc) Returns the clientId
229      *
230      * @see dojox.cometd.Client#getId()
231      */
232     public String getId()
233     {
234         return _clientId;
235     }
236 
237     /* ------------------------------------------------------------ */
238     protected void doStart() throws Exception
239     {
240         if (!_httpClient.isStarted())
241             throw new IllegalStateException("!HttpClient.isStarted()");
242 
243         synchronized (_httpClient)
244         {
245             if (_jsonOut == null)
246             {
247                 _jsonOut = (JSON)_httpClient.getAttribute(__JSON);
248                 if (_jsonOut==null)
249                 {
250                     _jsonOut = new JSON();
251                     _httpClient.setAttribute(__JSON,_jsonOut);
252                 }
253             }
254 
255             if (_timer == null)
256             {
257                 _timer = (Timer)_httpClient.getAttribute(__TIMER);
258                 if (_timer==null)
259                 {
260                     _ownTimer=true;
261                     _timer = new Timer(__TIMER+"@"+hashCode(),true);
262                     _httpClient.setAttribute(__TIMER,_timer);
263                 }
264             }
265 
266             if (_msgPool == null)
267             {
268                 _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL);
269                 if (_msgPool==null)
270                 {
271                     _msgPool = new MessagePool();
272                     _httpClient.setAttribute(__MSGPOOL,_msgPool);
273                 }
274             }
275         }
276         _disconnecting=false;
277         _pull=null;
278         _push=null;
279         super.doStart();
280         synchronized (_outQ)
281         {
282             if (!_initialized && _pull == null)
283             {
284                 _pull = new Handshake();
285                 send((Exchange)_pull,false);
286             }
287         }
288     }
289 
290     /* ------------------------------------------------------------ */
291     protected void doStop() throws Exception
292     {
293         if (_ownTimer)
294         {
295             _timer.cancel();
296             _timer=null;
297         }
298         
299         if (!_disconnecting)
300             disconnect();
301         super.doStop();
302     }
303 
304     /* ------------------------------------------------------------ */
305     public boolean isPolling()
306     {
307         synchronized (_outQ)
308         {
309             return isRunning() && (_pull != null);
310         }
311     }
312 
313     /* ------------------------------------------------------------ */
314     /**
315      * (non-Javadoc)
316      */
317     public void deliver(Client from, Message message)
318     {
319         if (!isRunning())
320             throw new IllegalStateException("Not running");
321 
322         synchronized (_inQ)
323         {
324             if (_mListeners == null)
325                 _inQ.add(message);
326             else
327             {
328                 for (MessageListener l : _mListeners)
329                 {
330                     if (l instanceof MessageListener.Synchronous)
331                         notifyMessageListener(l, from, message);
332                 }
333             }
334         }
335 
336         if (_mListeners !=null)
337             for (MessageListener l : _mListeners)
338                 if (!(l instanceof MessageListener.Synchronous))
339                     notifyMessageListener(l, from, message);
340     }
341 
342     private void notifyMessageListener(MessageListener listener, Client from, Message message)
343     {
344         try
345         {
346             listener.deliver(from, this, message);
347         }
348         catch (Throwable x)
349         {
350             Log.debug(x);
351         }
352     }
353 
354     /* ------------------------------------------------------------ */
355     /*
356      * (non-Javadoc)
357      *
358      * @see dojox.cometd.Client#deliver(dojox.cometd.Client, java.lang.String,
359      * java.lang.Object, java.lang.String)
360      */
361     public void deliver(Client from, String toChannel, Object data, String id)
362     {
363         if (!isRunning())
364             throw new IllegalStateException("Not running");
365 
366         MessageImpl message = _msgPool.newMessage();
367 
368         message.put(Bayeux.CHANNEL_FIELD,toChannel);
369         message.put(Bayeux.DATA_FIELD,data);
370         if (id != null)
371             message.put(Bayeux.ID_FIELD,id);
372 
373         synchronized (_inQ)
374         {
375             if (_mListeners == null)
376             {
377                 message.incRef();
378                 _inQ.add(message);
379             }
380             else
381             {
382                 for (MessageListener l : _mListeners)
383                     if (l instanceof MessageListener.Synchronous)
384                         notifyMessageListener(l, from, message);
385             }
386         }
387 
388         if (_mListeners !=null)
389             for (MessageListener l : _mListeners)
390                 if (!(l instanceof MessageListener.Synchronous))
391                     notifyMessageListener(l, from, message);
392 
393         message.decRef();
394     }
395 
396     /* ------------------------------------------------------------ */
397     /**
398      * @deprecated
399      */
400     public org.cometd.Listener getListener()
401     {
402         synchronized (_inQ)
403         {
404             return _listener;
405         }
406     }
407 
408     /* ------------------------------------------------------------ */
409     /*
410      * (non-Javadoc)
411      *
412      * @see dojox.cometd.Client#hasMessages()
413      */
414     public boolean hasMessages()
415     {
416         synchronized (_inQ)
417         {
418             return _inQ.size() > 0;
419         }
420     }
421 
422     /* ------------------------------------------------------------ */
423     /*
424      * (non-Javadoc)
425      *
426      * @see dojox.cometd.Client#isLocal()
427      */
428     public boolean isLocal()
429     {
430         return false;
431     }
432 
433     /* ------------------------------------------------------------ */
434     /*
435      * (non-Javadoc)
436      *
437      * @see dojox.cometd.Client#subscribe(java.lang.String)
438      */
439     private void publish(MessageImpl msg)
440     {
441         msg.incRef();
442         synchronized (_outQ)
443         {
444             _outQ.add(msg);
445 
446             if (_batch == 0 && _initialized && _push == null)
447             {
448                 _push = new Publish();
449                 try
450                 {
451                     send(_push);
452                 }
453                 catch (IOException e)
454                 {
455                     metaPublishFail(e,((Publish)_push).getOutboundMessages());
456                 }
457                 catch (IllegalStateException e)
458                 {
459                     metaPublishFail(e,((Publish)_push).getOutboundMessages());
460                 }
461             }
462         }
463     }
464 
465     /* ------------------------------------------------------------ */
466     /*
467      * (non-Javadoc)
468      *
469      * @see dojox.cometd.Client#publish(java.lang.String, java.lang.Object,
470      * java.lang.String)
471      */
472     public void publish(String toChannel, Object data, String msgId)
473     {
474         if (!isRunning() || _disconnecting)
475             throw new IllegalStateException("Not running");
476 
477         MessageImpl msg = _msgPool.newMessage();
478         msg.put(Bayeux.CHANNEL_FIELD,toChannel);
479         msg.put(Bayeux.DATA_FIELD,data);
480         if (msgId != null)
481             msg.put(Bayeux.ID_FIELD,msgId);
482         publish(msg);
483         msg.decRef();
484     }
485 
486     /* ------------------------------------------------------------ */
487     /*
488      * (non-Javadoc)
489      *
490      * @see dojox.cometd.Client#subscribe(java.lang.String)
491      */
492     public void subscribe(String toChannel)
493     {
494         if (!isRunning() || _disconnecting)
495             throw new IllegalStateException("Not running");
496 
497         MessageImpl msg = _msgPool.newMessage();
498         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
499         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
500         publish(msg);
501         msg.decRef();
502     }
503 
504     /* ------------------------------------------------------------ */
505     /*
506      * (non-Javadoc)
507      *
508      * @see dojox.cometd.Client#unsubscribe(java.lang.String)
509      */
510     public void unsubscribe(String toChannel)
511     {
512         if (!isRunning() || _disconnecting)
513             throw new IllegalStateException("Not running");
514 
515         MessageImpl msg = _msgPool.newMessage();
516         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
517         msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
518         publish(msg);
519         msg.decRef();
520     }
521 
522     /* ------------------------------------------------------------ */
523     /**
524      * Disconnect this client.
525      * @deprecated use {@link #disconnect()}
526      */
527     public void remove()
528     {
529         disconnect();
530     }
531 
532     /* ------------------------------------------------------------ */
533     /**
534      * Disconnect this client.
535      */
536     public void disconnect()
537     {
538         if (isStopped() || _disconnecting)
539             throw new IllegalStateException("Not running");
540 
541         MessageImpl msg = _msgPool.newMessage();
542         msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
543 
544         synchronized (_outQ)
545         {
546             _outQ.add(msg);
547             _disconnecting = true;
548             if (_batch == 0 && _initialized && _push == null)
549             {
550                 _push = new Publish();
551                 try
552                 {
553                     send(_push);
554                 }
555                 catch (IOException e)
556                 {
557                     Log.warn(e.toString());
558                     Log.debug(e);
559                     send(_push,true);
560                 }
561             }
562             _initialized = false;
563         }
564     }
565 
566     /* ------------------------------------------------------------ */
567     /**
568      * @deprecated
569      */
570     public void setListener(org.cometd.Listener listener)
571     {
572         synchronized (_inQ)
573         {
574             if (_listener != null)
575                 removeListener(_listener);
576             _listener = listener;
577             if (_listener != null)
578                 addListener(_listener);
579         }
580     }
581 
582     /* ------------------------------------------------------------ */
583     /*
584      * (non-Javadoc) Removes all available messages from the inbound queue. If a
585      * listener is set then messages are not queued.
586      *
587      * @see dojox.cometd.Client#takeMessages()
588      */
589     public List<Message> takeMessages()
590     {
591         final LinkedList<Message> list;
592         synchronized (_inQ)
593         {
594             list = new LinkedList<Message>(_inQ);
595             _inQ.clear();
596         }
597         for (Message m : list)
598             if (m instanceof MessageImpl)
599                 ((MessageImpl)m).decRef();
600         return list;
601     }
602 
603     /* ------------------------------------------------------------ */
604     /*
605      * (non-Javadoc)
606      *
607      * @see dojox.cometd.Client#endBatch()
608      */
609     public void endBatch()
610     {
611         synchronized (_outQ)
612         {
613             if (--_batch <= 0)
614             {
615                 _batch = 0;
616                 if ((_initialized || _disconnecting) && _push == null && _outQ.size() > 0)
617                 {
618                     _push = new Publish();
619                     try
620                     {
621                         send(_push);
622                     }
623                     catch (IOException e)
624                     {
625                         metaPublishFail(e,((Publish)_push).getOutboundMessages());
626                     }
627                 }
628             }
629         }
630     }
631 
632     /* ------------------------------------------------------------ */
633     /*
634      * (non-Javadoc)
635      *
636      * @see dojox.cometd.Client#startBatch()
637      */
638     public void startBatch()
639     {
640         if (isStopped())
641             throw new IllegalStateException("Not running");
642 
643         synchronized (_outQ)
644         {
645             _batch++;
646         }
647     }
648 
649     /* ------------------------------------------------------------ */
650     /**
651      * Customize an Exchange. Called when an exchange is about to be sent to
652      * allow Cookies and Credentials to be customized. Default implementation
653      * sets any cookies
654      */
655     protected void customize(HttpExchange exchange)
656     {
657         StringBuilder builder = null;
658         for (String cookieName : _cookies.keySet())
659         {
660             if (builder == null)
661                 builder = new StringBuilder();
662             else
663                 builder.append("; ");
664 
665             // Expiration is handled by getCookie()
666             Cookie cookie = getCookie(cookieName);
667             if (cookie != null)
668             {
669                 builder.append(cookie.getName()); // TODO quotes
670                 builder.append("=");
671                 builder.append(cookie.getValue()); // TODO quotes
672             }
673         }
674 
675         if (builder != null)
676             exchange.setRequestHeader(HttpHeaders.COOKIE,builder.toString());
677 
678         if (_scheme!=null)
679             exchange.setScheme(_scheme);
680     }
681 
682     /* ------------------------------------------------------------ */
683     public void setCookie(Cookie cookie)
684     {
685         long expirationTime = System.currentTimeMillis();
686         int maxAge = cookie.getMaxAge();
687         if (maxAge < 0)
688             expirationTime = -1L;
689         else
690             expirationTime += maxAge * 1000;
691 
692         ExpirableCookie expirableCookie = new ExpirableCookie(cookie, expirationTime);
693         _cookies.put(cookie.getName(), expirableCookie);
694     }
695 
696     public Cookie getCookie(String name)
697     {
698         ExpirableCookie cookie = _cookies.get(name);
699         if (cookie != null)
700         {
701             if (cookie.isExpired())
702             {
703                 _cookies.remove(name);
704                 cookie = null;
705             }
706         }
707         return cookie == null ? null : cookie.cookie;
708     }
709 
710     /* ------------------------------------------------------------ */
711     /* ------------------------------------------------------------ */
712     /* ------------------------------------------------------------ */
713     /**
714      * The base class for all bayeux exchanges.
715      */
716     protected class Exchange extends ContentExchange
717     {
718         Message[] _responses;
719         int _connectFailures;
720         int _backoff = _backoffInterval;
721         String _json;
722 
723         /* ------------------------------------------------------------ */
724         Exchange(String info)
725         {
726             setMethod("POST");
727             setScheme(HttpSchemes.HTTP_BUFFER);
728             setAddress(_cometdAddress);
729             setURI(_path + "/" + info);
730             setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
731         }
732 
733         /* ------------------------------------------------------------ */
734         public int getBackoff()
735         {
736             return _backoff;
737         }
738 
739         /* ------------------------------------------------------------ */
740         public void incBackoff()
741         {
742             _backoff = Math.min(_backoff+_backoffIncrement,_backoffMaxInterval);
743         }
744 
745         /* ------------------------------------------------------------ */
746         protected void setMessage(String message)
747         {
748             message=extendOut(message);
749             setJson(message);
750         }
751 
752         /* ------------------------------------------------------------ */
753         protected void setJson(String json)
754         {
755             try
756             {
757                 _json = json;
758 
759                 if (_formEncoded)
760                     setRequestContent(new ByteArrayBuffer("message=" + URLEncoder.encode(_json,"utf-8")));
761                 else
762                     setRequestContent(new ByteArrayBuffer(_json,"utf-8"));
763             }
764             catch (Exception e)
765             {
766                 Log.ignore(e);
767                 setRequestContent(new ByteArrayBuffer(_json));
768             }
769         }
770 
771         /* ------------------------------------------------------------ */
772         protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
773         {
774             super.onResponseStatus(version,status,reason);
775         }
776 
777         /* ------------------------------------------------------------ */
778         protected void onResponseHeader(Buffer name, Buffer value) throws IOException
779         {
780             super.onResponseHeader(name,value);
781             if (!isRunning())
782                 return;
783 
784             if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.SET_COOKIE_ORDINAL)
785             {
786                 String cname = null;
787                 String cvalue = null;
788 
789                 QuotedStringTokenizer tok = new QuotedStringTokenizer(value.toString(),"=;",false,false);
790                 tok.setSingle(false);
791 
792                 if (tok.hasMoreElements())
793                     cname = tok.nextToken();
794                 if (tok.hasMoreElements())
795                     cvalue = tok.nextToken();
796 
797                 Cookie cookie = new Cookie(cname,cvalue);
798 
799                 while (tok.hasMoreTokens())
800                 {
801                     String token = tok.nextToken();
802                     if ("Version".equalsIgnoreCase(token))
803                         cookie.setVersion(Integer.parseInt(tok.nextToken()));
804                     else if ("Comment".equalsIgnoreCase(token))
805                         cookie.setComment(tok.nextToken());
806                     else if ("Path".equalsIgnoreCase(token))
807                         cookie.setPath(tok.nextToken());
808                     else if ("Domain".equalsIgnoreCase(token))
809                         cookie.setDomain(tok.nextToken());
810                     else if ("Expires".equalsIgnoreCase(token))
811                     {
812                         try
813                         {
814                             Date date = new SimpleDateFormat("EEE, dd-MMM-yy HH:mm:ss 'GMT'").parse(tok.nextToken());
815                             Long maxAge = TimeUnit.MILLISECONDS.toSeconds(date.getTime() - System.currentTimeMillis());
816                             cookie.setMaxAge(maxAge > 0 ? maxAge.intValue() : 0);
817                         }
818                         catch (ParseException ignored)
819                         {
820                         }
821                     }
822                     else if ("Max-Age".equalsIgnoreCase(token))
823                     {
824                         try
825                         {
826                             int maxAge = Integer.parseInt(tok.nextToken());
827                             cookie.setMaxAge(maxAge);
828                         }
829                         catch (NumberFormatException ignored)
830                         {
831                         }
832                     }
833                     else if ("Secure".equalsIgnoreCase(token))
834                         cookie.setSecure(true);
835                 }
836 
837                 BayeuxClient.this.setCookie(cookie);
838             }
839         }
840 
841         /* ------------------------------------------------------------ */
842         protected void onResponseComplete() throws IOException
843         {
844             if (!isRunning())
845                 return;
846 
847             super.onResponseComplete();
848 
849             if (getResponseStatus() == 200)
850             {
851                 String content = getResponseContent();
852                 // TODO
853                 if (content == null || content.length() == 0)
854                     throw new IllegalStateException();
855                 _responses = _msgPool.parse(content);
856 
857                 if (_responses!=null)
858                     for (int i=0;i<_responses.length;i++)
859                         extendIn(_responses[i]);
860             }
861         }
862 
863         /* ------------------------------------------------------------ */
864         protected void resend(boolean backoff)
865         {
866             if (!isRunning())
867                 return;
868 
869             final boolean disconnecting;
870             synchronized (_outQ)
871             {
872                 disconnecting=_disconnecting;
873             }
874             if (disconnecting)
875             {
876                 try{stop();}catch(Exception e){Log.ignore(e);}
877                 return;
878             }
879 
880             setJson(_json);
881             if (!send(this,backoff))
882                 Log.warn("Retries exhausted"); // giving up
883         }
884 
885         /* ------------------------------------------------------------ */
886         protected void recycle()
887         {
888             if (_responses!=null)
889                 for (Message msg:_responses)
890                     if (msg instanceof MessageImpl)
891                         ((MessageImpl)msg).decRef();
892             _responses=null;
893         }
894     }
895 
896     /* ------------------------------------------------------------ */
897     /**
898      * The Bayeux handshake exchange. Negotiates a client Id and initializes the
899      * protocol.
900      *
901      */
902     protected class Handshake extends Exchange
903     {
904         public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]";
905 
906         Handshake()
907         {
908             super("handshake");
909             setMessage(__HANDSHAKE);
910         }
911 
912         /* ------------------------------------------------------------ */
913         /*
914          * (non-Javadoc)
915          *
916          * @see
917          * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
918          */
919         protected void onResponseComplete() throws IOException
920         {
921             super.onResponseComplete();
922 
923             if (!isRunning())
924                 return;
925 
926             if (_disconnecting)
927             {
928                 Message error=_msgPool.newMessage();
929                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
930                 error.put("failure","expired");
931                 metaHandshake(false,false,error);
932                 try{stop();}catch(Exception e){Log.ignore(e);}
933                 return;
934             }
935 
936             if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
937             {
938                 MessageImpl response = (MessageImpl)_responses[0];
939                 boolean successful = response.isSuccessful();
940 
941                 // Get advice if there is any
942                 Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD);
943                 if (adviceField != null)
944                     _advice = new Advice(adviceField);
945 
946                 if (successful)
947                 {
948                     _handshook = true;
949                     if (Log.isDebugEnabled())
950                         Log.debug("Successful handshake, sending connect");
951                     _clientId = (String)response.get(Bayeux.CLIENT_FIELD);
952 
953                     metaHandshake(true,_handshook,response);
954                     _pull = new Connect();
955                     send(_pull,false);
956                 }
957                 else
958                 {
959                     metaHandshake(false,false,response);
960                     _handshook = false;
961                     if (_advice != null && _advice.isReconnectNone())
962                         throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]);
963                     else if (_advice != null && _advice.isReconnectHandshake())
964                     {
965                         _pull = new Handshake();
966                         if (!send(_pull,true))
967                             throw new IOException("Handshake, retries exhausted");
968                     }
969                     else
970                     // assume retry = reconnect?
971                     {
972                         _pull = new Connect();
973                         if (!send(_pull,true))
974                             throw new IOException("Connect after handshake, retries exhausted");
975                     }
976                 }
977             }
978             else
979             {
980                 Message error=_msgPool.newMessage();
981                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
982                 error.put("status",new Integer(getResponseStatus()));
983                 error.put("content",getResponseContent());
984 
985                 metaHandshake(false,false,error);
986                 resend(true);
987             }
988 
989             recycle();
990         }
991 
992         /* ------------------------------------------------------------ */
993         protected void onExpire()
994         {
995             // super.onExpire();
996             Message error=_msgPool.newMessage();
997             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
998             error.put("failure","expired");
999             metaHandshake(false,false,error);
1000             resend(true);
1001         }
1002 
1003         /* ------------------------------------------------------------ */
1004         protected void onConnectionFailed(Throwable ex)
1005         {
1006             // super.onConnectionFailed(ex);
1007             Message error=_msgPool.newMessage();
1008             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1009             error.put("failure",ex.toString());
1010             error.put("exception",ex);
1011             ex.printStackTrace();
1012             metaHandshake(false,false,error);
1013             resend(true);
1014         }
1015 
1016         /* ------------------------------------------------------------ */
1017         protected void onException(Throwable ex)
1018         {
1019             // super.onException(ex);
1020             Message error=_msgPool.newMessage();
1021             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1022             error.put("failure",ex.toString());
1023             error.put("exception",ex);
1024             metaHandshake(false,false,error);
1025             resend(true);
1026         }
1027     }
1028 
1029     /* ------------------------------------------------------------ */
1030     /**
1031      * The Bayeux Connect exchange. Connect exchanges implement the long poll
1032      * for Bayeux.
1033      */
1034     protected class Connect extends Exchange
1035     {
1036         String _connectString;
1037 
1038         Connect()
1039         {
1040             super("connect");
1041             _connectString = "[{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}]";
1042             setMessage(_connectString);
1043         }
1044 
1045         protected void onResponseComplete() throws IOException
1046         {
1047             super.onResponseComplete();
1048             if (!isRunning())
1049                 return;
1050 
1051             if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1052             {
1053                 try
1054                 {
1055                     startBatch();
1056 
1057                     for (int i = 0; i < _responses.length; i++)
1058                     {
1059                         Message msg = _responses[i];
1060 
1061                         // get advice if there is any
1062                         Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
1063                         if (adviceField != null)
1064                             _advice = new Advice(adviceField);
1065 
1066                         if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
1067                         {
1068                             Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
1069                             if (successful != null && successful.booleanValue())
1070                             {
1071                                 metaConnect(true,msg);
1072 
1073                                 if (!isRunning())
1074                                     break;
1075 
1076                                 synchronized (_outQ)
1077                                 {
1078                                     if (_disconnecting)
1079                                         continue;
1080 
1081                                     if (!isInitialized())
1082                                     {
1083                                         setInitialized(true);
1084                                         {
1085                                             if (_outQ.size() > 0)
1086                                             {
1087                                                 _push = new Publish();
1088                                                 send(_push);
1089                                             }
1090                                         }
1091                                     }
1092 
1093                                 }
1094                                 // send a Connect (ie longpoll) possibly with
1095                                 // delay according to interval advice
1096                                 _pull = new Connect();
1097                                 send(_pull,false);
1098                             }
1099                             else
1100                             {
1101                                 // received a failure to our connect message,
1102                                 // check the advice to see what to do:
1103                                 // reconnect: none = hard error
1104                                 // reconnect: handshake = send a handshake
1105                                 // message
1106                                 // reconnect: retry = send another connect,
1107                                 // possibly using interval
1108 
1109                                 setInitialized(false);
1110                                 metaConnect(false,msg);
1111 
1112                                 synchronized(_outQ)
1113                                 {
1114                                     if (!isRunning()||_disconnecting)
1115                                         break;
1116                                 }
1117 
1118                                 if (_advice != null && _advice.isReconnectNone())
1119                                     throw new IOException("Connect failed, advice reconnect=none");
1120                                 else if (_advice != null && _advice.isReconnectHandshake())
1121                                 {
1122                                     if (Log.isDebugEnabled())
1123                                         Log.debug("connect received success=false, advice is to rehandshake");
1124                                     _pull = new Handshake();
1125                                     send(_pull,true);
1126                                 }
1127                                 else
1128                                 {
1129                                     // assume retry = reconnect
1130                                     if (Log.isDebugEnabled())
1131                                         Log.debug("Assuming retry=reconnect");
1132                                     resend(true);
1133                                 }
1134                             }
1135                         }
1136                         deliver(null,msg);
1137                     }
1138                 }
1139                 finally
1140                 {
1141                     endBatch();
1142                 }
1143             }
1144             else
1145             {
1146                 Message error=_msgPool.newMessage();
1147                 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1148                 error.put("status",getResponseStatus());
1149                 error.put("content",getResponseContent());
1150                 metaConnect(false,error);
1151                 resend(true);
1152             }
1153 
1154             recycle();
1155         }
1156 
1157         /* ------------------------------------------------------------ */
1158         protected void onExpire()
1159         {
1160             // super.onExpire();
1161             setInitialized(false);
1162             Message error=_msgPool.newMessage();
1163             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1164             error.put("failure","expired");
1165             metaConnect(false,error);
1166             resend(true);
1167         }
1168 
1169         /* ------------------------------------------------------------ */
1170         protected void onConnectionFailed(Throwable ex)
1171         {
1172             // super.onConnectionFailed(ex);
1173             setInitialized(false);
1174             Message error=_msgPool.newMessage();
1175             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1176             error.put("failure",ex.toString());
1177             error.put("exception",ex);
1178             metaConnect(false,error);
1179             resend(true);
1180         }
1181 
1182         /* ------------------------------------------------------------ */
1183         protected void onException(Throwable ex)
1184         {
1185             // super.onException(ex);
1186             setInitialized(false);
1187             Message error=_msgPool.newMessage();
1188             error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1189             error.put("failure",ex.toString());
1190             error.put("exception",ex);
1191             metaConnect(false,error);
1192             resend(true);
1193         }
1194     }
1195 
1196     /* ------------------------------------------------------------ */
1197     /**
1198      * Publish message exchange. Sends messages to bayeux server and handles any
1199      * messages received as a result.
1200      */
1201     protected class Publish extends Exchange
1202     {
1203         Publish()
1204         {
1205             super("publish");
1206 
1207             StringBuffer json = new StringBuffer(256);
1208             synchronized (json)
1209             {
1210                 synchronized (_outQ)
1211                 {
1212                     int s=_outQ.size();
1213                     if (s == 0)
1214                         return;
1215 
1216                     for (int i=0;i<s;i++)
1217                     {
1218                         Message message = _outQ.getUnsafe(i);
1219                         message.put(Bayeux.CLIENT_FIELD,_clientId);
1220                         extendOut(message);
1221 
1222                         json.append(i==0?'[':',');
1223                         _jsonOut.append(json,message);
1224 
1225                         if (message instanceof MessageImpl)
1226                             ((MessageImpl)message).decRef();
1227                     }
1228                     json.append(']');
1229                     _outQ.clear();
1230                     setJson(json.toString());
1231                 }
1232             }
1233         }
1234 
1235         protected Message[] getOutboundMessages()
1236         {
1237             try
1238             {
1239                 return _msgPool.parse(_json);
1240             }
1241             catch (IOException e)
1242             {
1243                 Log.warn("Error converting outbound messages");
1244                 if (Log.isDebugEnabled())
1245                     Log.debug(e);
1246                 return null;
1247             }
1248         }
1249 
1250         /* ------------------------------------------------------------ */
1251         /*
1252          * (non-Javadoc)
1253          *
1254          * @see
1255          * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete()
1256          */
1257         protected void onResponseComplete() throws IOException
1258         {
1259             if (!isRunning())
1260                 return;
1261 
1262             super.onResponseComplete();
1263             try
1264             {
1265                 synchronized (_outQ)
1266                 {
1267                     startBatch();
1268                     _push = null;
1269                 }
1270 
1271                 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1272                 {
1273                     for (int i = 0; i < _responses.length; i++)
1274                     {
1275                         MessageImpl msg = (MessageImpl)_responses[i];
1276 
1277                         deliver(null,msg);
1278                         if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful())
1279                         {
1280                             if (isStarted())
1281                             {
1282                                 try{stop();}catch(Exception e){Log.ignore(e);}
1283                             }
1284                             break;
1285                         }
1286                     }
1287                 }
1288                 else
1289                 {
1290                     Log.warn("Publish, error=" + getResponseStatus());
1291                 }
1292             }
1293             finally
1294             {
1295                 endBatch();
1296             }
1297             recycle();
1298         }
1299 
1300         /* ------------------------------------------------------------ */
1301         protected void onExpire()
1302         {
1303             super.onExpire();
1304             metaPublishFail(null,this.getOutboundMessages());
1305             if (_disconnecting)
1306             {
1307                 try{stop();}catch(Exception e){Log.ignore(e);}
1308             }
1309         }
1310 
1311         /* ------------------------------------------------------------ */
1312         protected void onConnectionFailed(Throwable ex)
1313         {
1314             super.onConnectionFailed(ex);
1315             metaPublishFail(ex,this.getOutboundMessages());
1316             if (_disconnecting)
1317             {
1318                 try{stop();}catch(Exception e){Log.ignore(e);}
1319             }
1320         }
1321 
1322         /* ------------------------------------------------------------ */
1323         protected void onException(Throwable ex)
1324         {
1325             super.onException(ex);
1326             metaPublishFail(ex,this.getOutboundMessages());
1327             if (_disconnecting)
1328             {
1329                 try{stop();}catch(Exception e){Log.ignore(e);}
1330             }
1331         }
1332     }
1333 
1334     /* ------------------------------------------------------------ */
1335     public void addListener(ClientListener listener)
1336     {
1337         synchronized (_inQ)
1338         {
1339             boolean added=false;
1340             if (listener instanceof MessageListener)
1341             {
1342                 added=true;
1343                 if (_mListeners == null)
1344                     _mListeners = new ArrayList<MessageListener>();
1345                 _mListeners.add((MessageListener)listener);
1346             }
1347             if (listener instanceof RemoveListener)
1348             {
1349                 added=true;
1350                 if (_rListeners == null)
1351                     _rListeners = new ArrayList<RemoveListener>();
1352                 _rListeners.add((RemoveListener)listener);
1353             }
1354 
1355             if (!added)
1356                 throw new IllegalArgumentException();
1357         }
1358     }
1359 
1360     /* ------------------------------------------------------------ */
1361     public void removeListener(ClientListener listener)
1362     {
1363         synchronized (_inQ)
1364         {
1365             if (listener instanceof MessageListener)
1366             {
1367                 if (_mListeners != null)
1368                     _mListeners.remove((MessageListener)listener);
1369             }
1370             if (listener instanceof RemoveListener)
1371             {
1372                 if (_rListeners != null)
1373                     _rListeners.remove((RemoveListener)listener);
1374             }
1375         }
1376     }
1377 
1378     /* ------------------------------------------------------------ */
1379     public int getMaxQueue()
1380     {
1381         return -1;
1382     }
1383 
1384     /* ------------------------------------------------------------ */
1385     public Queue<Message> getQueue()
1386     {
1387         return _inQ;
1388     }
1389 
1390     /* ------------------------------------------------------------ */
1391     public void setMaxQueue(int max)
1392     {
1393         if (max != -1)
1394             throw new UnsupportedOperationException();
1395     }
1396 
1397     /* ------------------------------------------------------------ */
1398     /**
1399      * Send the exchange, possibly using a backoff.
1400      *
1401      * @param exchange
1402      * @param backoff
1403      *            if true, use backoff algorithm to send
1404      * @return
1405      */
1406     protected boolean send(final Exchange exchange, final boolean backoff)
1407     {
1408         long interval = (_advice != null?_advice.getInterval():0);
1409 
1410         if (backoff)
1411         {
1412             int backoffInterval = exchange.getBackoff();
1413             if (Log.isDebugEnabled())
1414                 Log.debug("Send with backoff, interval=" + backoffInterval + " for " + exchange);
1415 
1416             exchange.incBackoff();
1417 
1418             interval += backoffInterval;
1419         }
1420 
1421         if (interval > 0)
1422         {
1423             TimerTask task = new TimerTask()
1424             {
1425                 public void run()
1426                 {
1427                     try
1428                     {
1429                         send(exchange);
1430                     }
1431                     catch (IOException e)
1432                     {
1433                         Log.warn("Delayed send, retry: "+e);
1434                         Log.debug(e);
1435                         send(exchange,true);
1436                     }
1437                     catch (IllegalStateException e)
1438                     {
1439                         Log.debug(e);
1440                         if (isRunning())
1441                         {
1442                             Log.warn("Delayed send, retry: "+e);
1443                             send(exchange,true);
1444                         }
1445                     }
1446                 }
1447             };
1448             if (Log.isDebugEnabled())
1449                 Log.debug("Delay " + interval + " send of " + exchange);
1450             _timer.schedule(task,interval);
1451         }
1452         else
1453         {
1454             try
1455             {
1456                 send(exchange);
1457             }
1458             catch (IOException e)
1459             {
1460                 Log.warn("Send, retry on fail: "+e);
1461                 Log.debug(e);
1462                 return send(exchange,true);
1463             }
1464             catch (IllegalStateException e)
1465             {
1466                 Log.warn("Send, retry on fail: "+e);
1467                 Log.debug(e);
1468                 return send(exchange,true);
1469             }
1470         }
1471         return true;
1472 
1473     }
1474 
1475     /* ------------------------------------------------------------ */
1476     /**
1477      * Send the exchange.
1478      *
1479      * @param exchange
1480      * @throws IOException
1481      */
1482     protected void send(HttpExchange exchange) throws IOException
1483     {
1484         exchange.reset(); // ensure at start state
1485         customize(exchange);
1486         if (Log.isDebugEnabled())
1487             Log.debug("Send: using any connection=" + exchange);
1488         _httpClient.send(exchange); // use any connection
1489     }
1490 
1491     /* ------------------------------------------------------------ */
1492     /**
1493      * False when we have received a success=false message in response to a
1494      * Connect, or we have had an exception when sending or receiving a Connect.
1495      *
1496      * True when handshake and then connect has happened.
1497      *
1498      * @param b
1499      */
1500     protected void setInitialized(boolean b)
1501     {
1502         synchronized (_outQ)
1503         {
1504             _initialized = b;
1505         }
1506     }
1507 
1508     /* ------------------------------------------------------------ */
1509     protected boolean isInitialized()
1510     {
1511         return _initialized;
1512     }
1513 
1514     /* ------------------------------------------------------------ */
1515     /**
1516      * Called with the results of a /meta/connect message
1517      * @param success connect was returned with this status
1518      */
1519     protected void metaConnect(boolean success, Message message)
1520     {
1521         if (!success)
1522             Log.warn(this.toString()+" "+message.toString());
1523     }
1524 
1525     /* ------------------------------------------------------------ */
1526     /**
1527      * Called with the results of a /meta/handshake message
1528      * @param success connect was returned with this status
1529      * @param reestablish the client was previously connected.
1530      */
1531     protected void metaHandshake(boolean success, boolean reestablish, Message message)
1532     {
1533         if (!success)
1534             Log.warn(this.toString()+" "+message.toString());
1535     }
1536 
1537     /* ------------------------------------------------------------ */
1538     /**
1539      * Called with the results of a failed publish
1540      */
1541     protected void metaPublishFail(Throwable e, Message[] messages)
1542     {
1543         Log.warn(this.toString()+": "+e);
1544         Log.debug(e);
1545     }
1546 
1547     /* ------------------------------------------------------------ */
1548     /** Called to extend outbound string messages.
1549      * Some messages are sent as preformatted JSON strings (eg handshake
1550      * and connect messages).  This extendOut method is a variation of the
1551      * {@link #extendOut(Message)} method to efficiently cater for these
1552      * preformatted strings.
1553      * <p>
1554      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1555      *
1556      * @param msg
1557      * @return the extended message
1558      */
1559     protected String extendOut(String msg)
1560     {
1561         if (_extensions==null)
1562             return msg;
1563 
1564         try
1565         {
1566             Message[] messages = _msgPool.parse(msg);
1567             for (int i=0; i<messages.length; i++)
1568                 extendOut(messages[i]);
1569             if (messages.length==1 && msg.charAt(0)=='{')
1570                 return _msgPool.getMsgJSON().toJSON(messages[0]);
1571             return _msgPool.getMsgJSON().toJSON(messages);
1572         }
1573         catch(IOException e)
1574         {
1575             Log.warn(e);
1576             return msg;
1577         }
1578     }
1579 
1580     /* ------------------------------------------------------------ */
1581     /** Called to extend outbound messages
1582      * <p>
1583      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1584      *
1585      */
1586     protected void extendOut(Message message)
1587     {
1588         if (_extensions!=null)
1589         {
1590             Message m = message;
1591             String channelId = m.getChannel();
1592             if (channelId != null)
1593             {
1594                 if (channelId.startsWith(Bayeux.META_SLASH))
1595                     for (int i=0;m!=null && i<_extensions.length;i++)
1596                         m=_extensions[i].sendMeta(this,m);
1597                 else
1598                     for (int i=0;m!=null && i<_extensions.length;i++)
1599                         m=_extensions[i].send(this,m);
1600             }
1601 
1602             if (message!=m)
1603             {
1604                 message.clear();
1605                 if (m!=null)
1606                     for (Map.Entry<String,Object> entry:m.entrySet())
1607                         message.put(entry.getKey(),entry.getValue());
1608             }
1609         }
1610     }
1611 
1612     /* ------------------------------------------------------------ */
1613     /** Called to extend inbound messages
1614      * <p>
1615      * This method calls the {@link Extension}s added by {@link #addExtension(Extension)}
1616      *
1617      */
1618     protected void extendIn(Message message)
1619     {
1620         if (_extensions!=null)
1621         {
1622             Message m = message;
1623             String channelId = m.getChannel();
1624             if (channelId != null)
1625             {
1626                 if (channelId.startsWith(Bayeux.META_SLASH))
1627                     for (int i=_extensions.length;m!=null && i-->0;)
1628                         m=_extensions[i].rcvMeta(this,m);
1629                 else
1630                     for (int i=_extensions.length;m!=null && i-->0;)
1631                         m=_extensions[i].rcv(this,m);
1632             }
1633 
1634             if (message!=m)
1635             {
1636                 message.clear();
1637                 if (m!=null)
1638                     for (Map.Entry<String,Object> entry:m.entrySet())
1639                         message.put(entry.getKey(),entry.getValue());
1640             }
1641         }
1642     }
1643 
1644     private static class ExpirableCookie
1645     {
1646         private final Cookie cookie;
1647         private final long expirationTime;
1648 
1649         private ExpirableCookie(Cookie cookie, long expirationTime)
1650         {
1651             this.cookie = cookie;
1652             this.expirationTime = expirationTime;
1653         }
1654 
1655         private boolean isExpired()
1656         {
1657             if (expirationTime < 0) return false;
1658             return System.currentTimeMillis() >= expirationTime;
1659         }
1660     }
1661 }