1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.client;
16
17 import java.io.IOException;
18 import java.net.URLEncoder;
19 import java.text.ParseException;
20 import java.text.SimpleDateFormat;
21 import java.util.ArrayList;
22 import java.util.Date;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Queue;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.TimeUnit;
31 import javax.servlet.http.Cookie;
32
33 import org.cometd.Bayeux;
34 import org.cometd.Client;
35 import org.cometd.ClientListener;
36 import org.cometd.Extension;
37 import org.cometd.Message;
38 import org.cometd.MessageListener;
39 import org.cometd.RemoveListener;
40 import org.mortbay.cometd.MessageImpl;
41 import org.mortbay.cometd.MessagePool;
42 import org.mortbay.component.AbstractLifeCycle;
43 import org.mortbay.io.Buffer;
44 import org.mortbay.io.ByteArrayBuffer;
45 import org.mortbay.jetty.HttpHeaders;
46 import org.mortbay.jetty.HttpSchemes;
47 import org.mortbay.jetty.HttpURI;
48 import org.mortbay.jetty.client.Address;
49 import org.mortbay.jetty.client.ContentExchange;
50 import org.mortbay.jetty.client.HttpClient;
51 import org.mortbay.jetty.client.HttpExchange;
52 import org.mortbay.log.Log;
53 import org.mortbay.util.ArrayQueue;
54 import org.mortbay.util.LazyList;
55 import org.mortbay.util.QuotedStringTokenizer;
56 import org.mortbay.util.ajax.JSON;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class BayeuxClient extends AbstractLifeCycle implements Client
72 {
73 private final static String __TIMER="org.mortbay.cometd.client.Timer";
74 private final static String __JSON="org.mortbay.cometd.client.JSON";
75 private final static String __MSGPOOL="org.mortbay.cometd.MessagePool";
76 protected HttpClient _httpClient;
77
78 protected MessagePool _msgPool;
79 private ArrayQueue<Message> _inQ = new ArrayQueue<Message>();
80 private ArrayQueue<Message> _outQ = new ArrayQueue<Message>();
81 protected Address _cometdAddress;
82 private Exchange _pull;
83 private Exchange _push;
84 private String _path = "/cometd";
85 private boolean _initialized = false;
86 private boolean _disconnecting = false;
87 private boolean _handshook = false;
88 private String _clientId;
89 private org.cometd.Listener _listener;
90 private List<RemoveListener> _rListeners;
91 private List<MessageListener> _mListeners;
92 private int _batch;
93 private boolean _formEncoded;
94 private Map<String, ExpirableCookie> _cookies = new ConcurrentHashMap<String, ExpirableCookie>();
95 private Advice _advice;
96 private Timer _timer;
97 private boolean _ownTimer;
98 private int _backoffInterval = 0;
99 private int _backoffIncrement = 1000;
100 private int _backoffMaxInterval = 60000;
101 private Buffer _scheme;
102 protected Extension[] _extensions;
103 protected JSON _jsonOut;
104
105
106 public BayeuxClient(HttpClient client, String url)
107 {
108 this(client,url,null);
109 }
110
111
112 public BayeuxClient(HttpClient client, String url, Timer timer)
113 {
114 HttpURI uri = new HttpURI(url);
115 _httpClient = client;
116 _cometdAddress = new Address(uri.getHost(),uri.getPort());
117 _path=uri.getPath();
118 _timer = timer;
119 _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER;
120 }
121
122
123 public BayeuxClient(HttpClient client, Address address, String path, Timer timer)
124 {
125 _httpClient = client;
126 _cometdAddress = address;
127 _path = path;
128 _timer = timer;
129 }
130
131
132 public BayeuxClient(HttpClient client, Address address, String uri)
133 {
134 this(client,address,uri,null);
135 }
136
137
138 public void addExtension(Extension ext)
139 {
140 _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
141 }
142
143 public void removeExtension(Extension extension)
144 {
145 _extensions = (Extension[])LazyList.removeFromArray(_extensions,extension);
146 }
147
148
149 Extension[] getExtensions()
150 {
151 return _extensions;
152 }
153
154
155
156
157
158
159
160
161 public void setBackOffInterval(int interval)
162 {
163 _backoffInterval = interval;
164 }
165
166
167
168
169
170
171 public int getBackoffInterval()
172 {
173 return _backoffInterval;
174 }
175
176
177
178
179
180
181 public void setBackoffMaxRetries(int retries)
182 {
183 }
184
185
186
187
188
189 public int getBackoffMaxRetries()
190 {
191 return -1;
192 }
193
194
195
196
197
198
199
200 public void setBackoffIncrement(int interval)
201 {
202 _backoffIncrement = interval;
203 }
204
205
206
207
208
209
210 public int getBackoffIncrement()
211 {
212 return _backoffIncrement;
213 }
214
215
216 public void setBackoffMaxInterval(int interval)
217 {
218 _backoffMaxInterval = interval;
219 }
220
221 public int getBackoffMaxInterval()
222 {
223 return _backoffMaxInterval;
224 }
225
226
227
228
229
230
231
232 public String getId()
233 {
234 return _clientId;
235 }
236
237
238 protected void doStart() throws Exception
239 {
240 if (!_httpClient.isStarted())
241 throw new IllegalStateException("!HttpClient.isStarted()");
242
243 synchronized (_httpClient)
244 {
245 if (_jsonOut == null)
246 {
247 _jsonOut = (JSON)_httpClient.getAttribute(__JSON);
248 if (_jsonOut==null)
249 {
250 _jsonOut = new JSON();
251 _httpClient.setAttribute(__JSON,_jsonOut);
252 }
253 }
254
255 if (_timer == null)
256 {
257 _timer = (Timer)_httpClient.getAttribute(__TIMER);
258 if (_timer==null)
259 {
260 _ownTimer=true;
261 _timer = new Timer(__TIMER+"@"+hashCode(),true);
262 _httpClient.setAttribute(__TIMER,_timer);
263 }
264 }
265
266 if (_msgPool == null)
267 {
268 _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL);
269 if (_msgPool==null)
270 {
271 _msgPool = new MessagePool();
272 _httpClient.setAttribute(__MSGPOOL,_msgPool);
273 }
274 }
275 }
276 _disconnecting=false;
277 _pull=null;
278 _push=null;
279 super.doStart();
280 synchronized (_outQ)
281 {
282 if (!_initialized && _pull == null)
283 {
284 _pull = new Handshake();
285 send((Exchange)_pull,false);
286 }
287 }
288 }
289
290
291 protected void doStop() throws Exception
292 {
293 if (_ownTimer)
294 {
295 _timer.cancel();
296 _timer=null;
297 }
298
299 if (!_disconnecting)
300 disconnect();
301 super.doStop();
302 }
303
304
305 public boolean isPolling()
306 {
307 synchronized (_outQ)
308 {
309 return isRunning() && (_pull != null);
310 }
311 }
312
313
314
315
316
317 public void deliver(Client from, Message message)
318 {
319 if (!isRunning())
320 throw new IllegalStateException("Not running");
321
322 synchronized (_inQ)
323 {
324 if (_mListeners == null)
325 _inQ.add(message);
326 else
327 {
328 for (MessageListener l : _mListeners)
329 {
330 if (l instanceof MessageListener.Synchronous)
331 notifyMessageListener(l, from, message);
332 }
333 }
334 }
335
336 if (_mListeners !=null)
337 for (MessageListener l : _mListeners)
338 if (!(l instanceof MessageListener.Synchronous))
339 notifyMessageListener(l, from, message);
340 }
341
342 private void notifyMessageListener(MessageListener listener, Client from, Message message)
343 {
344 try
345 {
346 listener.deliver(from, this, message);
347 }
348 catch (Throwable x)
349 {
350 Log.debug(x);
351 }
352 }
353
354
355
356
357
358
359
360
361 public void deliver(Client from, String toChannel, Object data, String id)
362 {
363 if (!isRunning())
364 throw new IllegalStateException("Not running");
365
366 MessageImpl message = _msgPool.newMessage();
367
368 message.put(Bayeux.CHANNEL_FIELD,toChannel);
369 message.put(Bayeux.DATA_FIELD,data);
370 if (id != null)
371 message.put(Bayeux.ID_FIELD,id);
372
373 synchronized (_inQ)
374 {
375 if (_mListeners == null)
376 {
377 message.incRef();
378 _inQ.add(message);
379 }
380 else
381 {
382 for (MessageListener l : _mListeners)
383 if (l instanceof MessageListener.Synchronous)
384 notifyMessageListener(l, from, message);
385 }
386 }
387
388 if (_mListeners !=null)
389 for (MessageListener l : _mListeners)
390 if (!(l instanceof MessageListener.Synchronous))
391 notifyMessageListener(l, from, message);
392
393 message.decRef();
394 }
395
396
397
398
399
400 public org.cometd.Listener getListener()
401 {
402 synchronized (_inQ)
403 {
404 return _listener;
405 }
406 }
407
408
409
410
411
412
413
414 public boolean hasMessages()
415 {
416 synchronized (_inQ)
417 {
418 return _inQ.size() > 0;
419 }
420 }
421
422
423
424
425
426
427
428 public boolean isLocal()
429 {
430 return false;
431 }
432
433
434
435
436
437
438
439 private void publish(MessageImpl msg)
440 {
441 msg.incRef();
442 synchronized (_outQ)
443 {
444 _outQ.add(msg);
445
446 if (_batch == 0 && _initialized && _push == null)
447 {
448 _push = new Publish();
449 try
450 {
451 send(_push);
452 }
453 catch (IOException e)
454 {
455 metaPublishFail(e,((Publish)_push).getOutboundMessages());
456 }
457 catch (IllegalStateException e)
458 {
459 metaPublishFail(e,((Publish)_push).getOutboundMessages());
460 }
461 }
462 }
463 }
464
465
466
467
468
469
470
471
472 public void publish(String toChannel, Object data, String msgId)
473 {
474 if (!isRunning() || _disconnecting)
475 throw new IllegalStateException("Not running");
476
477 MessageImpl msg = _msgPool.newMessage();
478 msg.put(Bayeux.CHANNEL_FIELD,toChannel);
479 msg.put(Bayeux.DATA_FIELD,data);
480 if (msgId != null)
481 msg.put(Bayeux.ID_FIELD,msgId);
482 publish(msg);
483 msg.decRef();
484 }
485
486
487
488
489
490
491
492 public void subscribe(String toChannel)
493 {
494 if (!isRunning() || _disconnecting)
495 throw new IllegalStateException("Not running");
496
497 MessageImpl msg = _msgPool.newMessage();
498 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
499 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
500 publish(msg);
501 msg.decRef();
502 }
503
504
505
506
507
508
509
510 public void unsubscribe(String toChannel)
511 {
512 if (!isRunning() || _disconnecting)
513 throw new IllegalStateException("Not running");
514
515 MessageImpl msg = _msgPool.newMessage();
516 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
517 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
518 publish(msg);
519 msg.decRef();
520 }
521
522
523
524
525
526
527 public void remove()
528 {
529 disconnect();
530 }
531
532
533
534
535
536 public void disconnect()
537 {
538 if (isStopped() || _disconnecting)
539 throw new IllegalStateException("Not running");
540
541 MessageImpl msg = _msgPool.newMessage();
542 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
543
544 synchronized (_outQ)
545 {
546 _outQ.add(msg);
547 _disconnecting = true;
548 if (_batch == 0 && _initialized && _push == null)
549 {
550 _push = new Publish();
551 try
552 {
553 send(_push);
554 }
555 catch (IOException e)
556 {
557 Log.warn(e.toString());
558 Log.debug(e);
559 send(_push,true);
560 }
561 }
562 _initialized = false;
563 }
564 }
565
566
567
568
569
570 public void setListener(org.cometd.Listener listener)
571 {
572 synchronized (_inQ)
573 {
574 if (_listener != null)
575 removeListener(_listener);
576 _listener = listener;
577 if (_listener != null)
578 addListener(_listener);
579 }
580 }
581
582
583
584
585
586
587
588
589 public List<Message> takeMessages()
590 {
591 final LinkedList<Message> list;
592 synchronized (_inQ)
593 {
594 list = new LinkedList<Message>(_inQ);
595 _inQ.clear();
596 }
597 for (Message m : list)
598 if (m instanceof MessageImpl)
599 ((MessageImpl)m).decRef();
600 return list;
601 }
602
603
604
605
606
607
608
609 public void endBatch()
610 {
611 synchronized (_outQ)
612 {
613 if (--_batch <= 0)
614 {
615 _batch = 0;
616 if ((_initialized || _disconnecting) && _push == null && _outQ.size() > 0)
617 {
618 _push = new Publish();
619 try
620 {
621 send(_push);
622 }
623 catch (IOException e)
624 {
625 metaPublishFail(e,((Publish)_push).getOutboundMessages());
626 }
627 }
628 }
629 }
630 }
631
632
633
634
635
636
637
638 public void startBatch()
639 {
640 if (isStopped())
641 throw new IllegalStateException("Not running");
642
643 synchronized (_outQ)
644 {
645 _batch++;
646 }
647 }
648
649
650
651
652
653
654
655 protected void customize(HttpExchange exchange)
656 {
657 StringBuilder builder = null;
658 for (String cookieName : _cookies.keySet())
659 {
660 if (builder == null)
661 builder = new StringBuilder();
662 else
663 builder.append("; ");
664
665
666 Cookie cookie = getCookie(cookieName);
667 if (cookie != null)
668 {
669 builder.append(cookie.getName());
670 builder.append("=");
671 builder.append(cookie.getValue());
672 }
673 }
674
675 if (builder != null)
676 exchange.setRequestHeader(HttpHeaders.COOKIE,builder.toString());
677
678 if (_scheme!=null)
679 exchange.setScheme(_scheme);
680 }
681
682
683 public void setCookie(Cookie cookie)
684 {
685 long expirationTime = System.currentTimeMillis();
686 int maxAge = cookie.getMaxAge();
687 if (maxAge < 0)
688 expirationTime = -1L;
689 else
690 expirationTime += maxAge * 1000;
691
692 ExpirableCookie expirableCookie = new ExpirableCookie(cookie, expirationTime);
693 _cookies.put(cookie.getName(), expirableCookie);
694 }
695
696 public Cookie getCookie(String name)
697 {
698 ExpirableCookie cookie = _cookies.get(name);
699 if (cookie != null)
700 {
701 if (cookie.isExpired())
702 {
703 _cookies.remove(name);
704 cookie = null;
705 }
706 }
707 return cookie == null ? null : cookie.cookie;
708 }
709
710
711
712
713
714
715
716 protected class Exchange extends ContentExchange
717 {
718 Message[] _responses;
719 int _connectFailures;
720 int _backoff = _backoffInterval;
721 String _json;
722
723
724 Exchange(String info)
725 {
726 setMethod("POST");
727 setScheme(HttpSchemes.HTTP_BUFFER);
728 setAddress(_cometdAddress);
729 setURI(_path + "/" + info);
730 setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
731 }
732
733
734 public int getBackoff()
735 {
736 return _backoff;
737 }
738
739
740 public void incBackoff()
741 {
742 _backoff = Math.min(_backoff+_backoffIncrement,_backoffMaxInterval);
743 }
744
745
746 protected void setMessage(String message)
747 {
748 message=extendOut(message);
749 setJson(message);
750 }
751
752
753 protected void setJson(String json)
754 {
755 try
756 {
757 _json = json;
758
759 if (_formEncoded)
760 setRequestContent(new ByteArrayBuffer("message=" + URLEncoder.encode(_json,"utf-8")));
761 else
762 setRequestContent(new ByteArrayBuffer(_json,"utf-8"));
763 }
764 catch (Exception e)
765 {
766 Log.ignore(e);
767 setRequestContent(new ByteArrayBuffer(_json));
768 }
769 }
770
771
772 protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
773 {
774 super.onResponseStatus(version,status,reason);
775 }
776
777
778 protected void onResponseHeader(Buffer name, Buffer value) throws IOException
779 {
780 super.onResponseHeader(name,value);
781 if (!isRunning())
782 return;
783
784 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.SET_COOKIE_ORDINAL)
785 {
786 String cname = null;
787 String cvalue = null;
788
789 QuotedStringTokenizer tok = new QuotedStringTokenizer(value.toString(),"=;",false,false);
790 tok.setSingle(false);
791
792 if (tok.hasMoreElements())
793 cname = tok.nextToken();
794 if (tok.hasMoreElements())
795 cvalue = tok.nextToken();
796
797 Cookie cookie = new Cookie(cname,cvalue);
798
799 while (tok.hasMoreTokens())
800 {
801 String token = tok.nextToken();
802 if ("Version".equalsIgnoreCase(token))
803 cookie.setVersion(Integer.parseInt(tok.nextToken()));
804 else if ("Comment".equalsIgnoreCase(token))
805 cookie.setComment(tok.nextToken());
806 else if ("Path".equalsIgnoreCase(token))
807 cookie.setPath(tok.nextToken());
808 else if ("Domain".equalsIgnoreCase(token))
809 cookie.setDomain(tok.nextToken());
810 else if ("Expires".equalsIgnoreCase(token))
811 {
812 try
813 {
814 Date date = new SimpleDateFormat("EEE, dd-MMM-yy HH:mm:ss 'GMT'").parse(tok.nextToken());
815 Long maxAge = TimeUnit.MILLISECONDS.toSeconds(date.getTime() - System.currentTimeMillis());
816 cookie.setMaxAge(maxAge > 0 ? maxAge.intValue() : 0);
817 }
818 catch (ParseException ignored)
819 {
820 }
821 }
822 else if ("Max-Age".equalsIgnoreCase(token))
823 {
824 try
825 {
826 int maxAge = Integer.parseInt(tok.nextToken());
827 cookie.setMaxAge(maxAge);
828 }
829 catch (NumberFormatException ignored)
830 {
831 }
832 }
833 else if ("Secure".equalsIgnoreCase(token))
834 cookie.setSecure(true);
835 }
836
837 BayeuxClient.this.setCookie(cookie);
838 }
839 }
840
841
842 protected void onResponseComplete() throws IOException
843 {
844 if (!isRunning())
845 return;
846
847 super.onResponseComplete();
848
849 if (getResponseStatus() == 200)
850 {
851 String content = getResponseContent();
852
853 if (content == null || content.length() == 0)
854 throw new IllegalStateException();
855 _responses = _msgPool.parse(content);
856
857 if (_responses!=null)
858 for (int i=0;i<_responses.length;i++)
859 extendIn(_responses[i]);
860 }
861 }
862
863
864 protected void resend(boolean backoff)
865 {
866 if (!isRunning())
867 return;
868
869 final boolean disconnecting;
870 synchronized (_outQ)
871 {
872 disconnecting=_disconnecting;
873 }
874 if (disconnecting)
875 {
876 try{stop();}catch(Exception e){Log.ignore(e);}
877 return;
878 }
879
880 setJson(_json);
881 if (!send(this,backoff))
882 Log.warn("Retries exhausted");
883 }
884
885
886 protected void recycle()
887 {
888 if (_responses!=null)
889 for (Message msg:_responses)
890 if (msg instanceof MessageImpl)
891 ((MessageImpl)msg).decRef();
892 _responses=null;
893 }
894 }
895
896
897
898
899
900
901
902 protected class Handshake extends Exchange
903 {
904 public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]";
905
906 Handshake()
907 {
908 super("handshake");
909 setMessage(__HANDSHAKE);
910 }
911
912
913
914
915
916
917
918
919 protected void onResponseComplete() throws IOException
920 {
921 super.onResponseComplete();
922
923 if (!isRunning())
924 return;
925
926 if (_disconnecting)
927 {
928 Message error=_msgPool.newMessage();
929 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
930 error.put("failure","expired");
931 metaHandshake(false,false,error);
932 try{stop();}catch(Exception e){Log.ignore(e);}
933 return;
934 }
935
936 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
937 {
938 MessageImpl response = (MessageImpl)_responses[0];
939 boolean successful = response.isSuccessful();
940
941
942 Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD);
943 if (adviceField != null)
944 _advice = new Advice(adviceField);
945
946 if (successful)
947 {
948 _handshook = true;
949 if (Log.isDebugEnabled())
950 Log.debug("Successful handshake, sending connect");
951 _clientId = (String)response.get(Bayeux.CLIENT_FIELD);
952
953 metaHandshake(true,_handshook,response);
954 _pull = new Connect();
955 send(_pull,false);
956 }
957 else
958 {
959 metaHandshake(false,false,response);
960 _handshook = false;
961 if (_advice != null && _advice.isReconnectNone())
962 throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]);
963 else if (_advice != null && _advice.isReconnectHandshake())
964 {
965 _pull = new Handshake();
966 if (!send(_pull,true))
967 throw new IOException("Handshake, retries exhausted");
968 }
969 else
970
971 {
972 _pull = new Connect();
973 if (!send(_pull,true))
974 throw new IOException("Connect after handshake, retries exhausted");
975 }
976 }
977 }
978 else
979 {
980 Message error=_msgPool.newMessage();
981 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
982 error.put("status",new Integer(getResponseStatus()));
983 error.put("content",getResponseContent());
984
985 metaHandshake(false,false,error);
986 resend(true);
987 }
988
989 recycle();
990 }
991
992
993 protected void onExpire()
994 {
995
996 Message error=_msgPool.newMessage();
997 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
998 error.put("failure","expired");
999 metaHandshake(false,false,error);
1000 resend(true);
1001 }
1002
1003
1004 protected void onConnectionFailed(Throwable ex)
1005 {
1006
1007 Message error=_msgPool.newMessage();
1008 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1009 error.put("failure",ex.toString());
1010 error.put("exception",ex);
1011 ex.printStackTrace();
1012 metaHandshake(false,false,error);
1013 resend(true);
1014 }
1015
1016
1017 protected void onException(Throwable ex)
1018 {
1019
1020 Message error=_msgPool.newMessage();
1021 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1022 error.put("failure",ex.toString());
1023 error.put("exception",ex);
1024 metaHandshake(false,false,error);
1025 resend(true);
1026 }
1027 }
1028
1029
1030
1031
1032
1033
1034 protected class Connect extends Exchange
1035 {
1036 String _connectString;
1037
1038 Connect()
1039 {
1040 super("connect");
1041 _connectString = "[{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}]";
1042 setMessage(_connectString);
1043 }
1044
1045 protected void onResponseComplete() throws IOException
1046 {
1047 super.onResponseComplete();
1048 if (!isRunning())
1049 return;
1050
1051 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1052 {
1053 try
1054 {
1055 startBatch();
1056
1057 for (int i = 0; i < _responses.length; i++)
1058 {
1059 Message msg = _responses[i];
1060
1061
1062 Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
1063 if (adviceField != null)
1064 _advice = new Advice(adviceField);
1065
1066 if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
1067 {
1068 Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
1069 if (successful != null && successful.booleanValue())
1070 {
1071 metaConnect(true,msg);
1072
1073 if (!isRunning())
1074 break;
1075
1076 synchronized (_outQ)
1077 {
1078 if (_disconnecting)
1079 continue;
1080
1081 if (!isInitialized())
1082 {
1083 setInitialized(true);
1084 {
1085 if (_outQ.size() > 0)
1086 {
1087 _push = new Publish();
1088 send(_push);
1089 }
1090 }
1091 }
1092
1093 }
1094
1095
1096 _pull = new Connect();
1097 send(_pull,false);
1098 }
1099 else
1100 {
1101
1102
1103
1104
1105
1106
1107
1108
1109 setInitialized(false);
1110 metaConnect(false,msg);
1111
1112 synchronized(_outQ)
1113 {
1114 if (!isRunning()||_disconnecting)
1115 break;
1116 }
1117
1118 if (_advice != null && _advice.isReconnectNone())
1119 throw new IOException("Connect failed, advice reconnect=none");
1120 else if (_advice != null && _advice.isReconnectHandshake())
1121 {
1122 if (Log.isDebugEnabled())
1123 Log.debug("connect received success=false, advice is to rehandshake");
1124 _pull = new Handshake();
1125 send(_pull,true);
1126 }
1127 else
1128 {
1129
1130 if (Log.isDebugEnabled())
1131 Log.debug("Assuming retry=reconnect");
1132 resend(true);
1133 }
1134 }
1135 }
1136 deliver(null,msg);
1137 }
1138 }
1139 finally
1140 {
1141 endBatch();
1142 }
1143 }
1144 else
1145 {
1146 Message error=_msgPool.newMessage();
1147 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1148 error.put("status",getResponseStatus());
1149 error.put("content",getResponseContent());
1150 metaConnect(false,error);
1151 resend(true);
1152 }
1153
1154 recycle();
1155 }
1156
1157
1158 protected void onExpire()
1159 {
1160
1161 setInitialized(false);
1162 Message error=_msgPool.newMessage();
1163 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1164 error.put("failure","expired");
1165 metaConnect(false,error);
1166 resend(true);
1167 }
1168
1169
1170 protected void onConnectionFailed(Throwable ex)
1171 {
1172
1173 setInitialized(false);
1174 Message error=_msgPool.newMessage();
1175 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1176 error.put("failure",ex.toString());
1177 error.put("exception",ex);
1178 metaConnect(false,error);
1179 resend(true);
1180 }
1181
1182
1183 protected void onException(Throwable ex)
1184 {
1185
1186 setInitialized(false);
1187 Message error=_msgPool.newMessage();
1188 error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1189 error.put("failure",ex.toString());
1190 error.put("exception",ex);
1191 metaConnect(false,error);
1192 resend(true);
1193 }
1194 }
1195
1196
1197
1198
1199
1200
1201 protected class Publish extends Exchange
1202 {
1203 Publish()
1204 {
1205 super("publish");
1206
1207 StringBuffer json = new StringBuffer(256);
1208 synchronized (json)
1209 {
1210 synchronized (_outQ)
1211 {
1212 int s=_outQ.size();
1213 if (s == 0)
1214 return;
1215
1216 for (int i=0;i<s;i++)
1217 {
1218 Message message = _outQ.getUnsafe(i);
1219 message.put(Bayeux.CLIENT_FIELD,_clientId);
1220 extendOut(message);
1221
1222 json.append(i==0?'[':',');
1223 _jsonOut.append(json,message);
1224
1225 if (message instanceof MessageImpl)
1226 ((MessageImpl)message).decRef();
1227 }
1228 json.append(']');
1229 _outQ.clear();
1230 setJson(json.toString());
1231 }
1232 }
1233 }
1234
1235 protected Message[] getOutboundMessages()
1236 {
1237 try
1238 {
1239 return _msgPool.parse(_json);
1240 }
1241 catch (IOException e)
1242 {
1243 Log.warn("Error converting outbound messages");
1244 if (Log.isDebugEnabled())
1245 Log.debug(e);
1246 return null;
1247 }
1248 }
1249
1250
1251
1252
1253
1254
1255
1256
1257 protected void onResponseComplete() throws IOException
1258 {
1259 if (!isRunning())
1260 return;
1261
1262 super.onResponseComplete();
1263 try
1264 {
1265 synchronized (_outQ)
1266 {
1267 startBatch();
1268 _push = null;
1269 }
1270
1271 if (getResponseStatus() == 200 && _responses != null && _responses.length > 0)
1272 {
1273 for (int i = 0; i < _responses.length; i++)
1274 {
1275 MessageImpl msg = (MessageImpl)_responses[i];
1276
1277 deliver(null,msg);
1278 if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful())
1279 {
1280 if (isStarted())
1281 {
1282 try{stop();}catch(Exception e){Log.ignore(e);}
1283 }
1284 break;
1285 }
1286 }
1287 }
1288 else
1289 {
1290 Log.warn("Publish, error=" + getResponseStatus());
1291 }
1292 }
1293 finally
1294 {
1295 endBatch();
1296 }
1297 recycle();
1298 }
1299
1300
1301 protected void onExpire()
1302 {
1303 super.onExpire();
1304 metaPublishFail(null,this.getOutboundMessages());
1305 if (_disconnecting)
1306 {
1307 try{stop();}catch(Exception e){Log.ignore(e);}
1308 }
1309 }
1310
1311
1312 protected void onConnectionFailed(Throwable ex)
1313 {
1314 super.onConnectionFailed(ex);
1315 metaPublishFail(ex,this.getOutboundMessages());
1316 if (_disconnecting)
1317 {
1318 try{stop();}catch(Exception e){Log.ignore(e);}
1319 }
1320 }
1321
1322
1323 protected void onException(Throwable ex)
1324 {
1325 super.onException(ex);
1326 metaPublishFail(ex,this.getOutboundMessages());
1327 if (_disconnecting)
1328 {
1329 try{stop();}catch(Exception e){Log.ignore(e);}
1330 }
1331 }
1332 }
1333
1334
1335 public void addListener(ClientListener listener)
1336 {
1337 synchronized (_inQ)
1338 {
1339 boolean added=false;
1340 if (listener instanceof MessageListener)
1341 {
1342 added=true;
1343 if (_mListeners == null)
1344 _mListeners = new ArrayList<MessageListener>();
1345 _mListeners.add((MessageListener)listener);
1346 }
1347 if (listener instanceof RemoveListener)
1348 {
1349 added=true;
1350 if (_rListeners == null)
1351 _rListeners = new ArrayList<RemoveListener>();
1352 _rListeners.add((RemoveListener)listener);
1353 }
1354
1355 if (!added)
1356 throw new IllegalArgumentException();
1357 }
1358 }
1359
1360
1361 public void removeListener(ClientListener listener)
1362 {
1363 synchronized (_inQ)
1364 {
1365 if (listener instanceof MessageListener)
1366 {
1367 if (_mListeners != null)
1368 _mListeners.remove((MessageListener)listener);
1369 }
1370 if (listener instanceof RemoveListener)
1371 {
1372 if (_rListeners != null)
1373 _rListeners.remove((RemoveListener)listener);
1374 }
1375 }
1376 }
1377
1378
1379 public int getMaxQueue()
1380 {
1381 return -1;
1382 }
1383
1384
1385 public Queue<Message> getQueue()
1386 {
1387 return _inQ;
1388 }
1389
1390
1391 public void setMaxQueue(int max)
1392 {
1393 if (max != -1)
1394 throw new UnsupportedOperationException();
1395 }
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406 protected boolean send(final Exchange exchange, final boolean backoff)
1407 {
1408 long interval = (_advice != null?_advice.getInterval():0);
1409
1410 if (backoff)
1411 {
1412 int backoffInterval = exchange.getBackoff();
1413 if (Log.isDebugEnabled())
1414 Log.debug("Send with backoff, interval=" + backoffInterval + " for " + exchange);
1415
1416 exchange.incBackoff();
1417
1418 interval += backoffInterval;
1419 }
1420
1421 if (interval > 0)
1422 {
1423 TimerTask task = new TimerTask()
1424 {
1425 public void run()
1426 {
1427 try
1428 {
1429 send(exchange);
1430 }
1431 catch (IOException e)
1432 {
1433 Log.warn("Delayed send, retry: "+e);
1434 Log.debug(e);
1435 send(exchange,true);
1436 }
1437 catch (IllegalStateException e)
1438 {
1439 Log.debug(e);
1440 if (isRunning())
1441 {
1442 Log.warn("Delayed send, retry: "+e);
1443 send(exchange,true);
1444 }
1445 }
1446 }
1447 };
1448 if (Log.isDebugEnabled())
1449 Log.debug("Delay " + interval + " send of " + exchange);
1450 _timer.schedule(task,interval);
1451 }
1452 else
1453 {
1454 try
1455 {
1456 send(exchange);
1457 }
1458 catch (IOException e)
1459 {
1460 Log.warn("Send, retry on fail: "+e);
1461 Log.debug(e);
1462 return send(exchange,true);
1463 }
1464 catch (IllegalStateException e)
1465 {
1466 Log.warn("Send, retry on fail: "+e);
1467 Log.debug(e);
1468 return send(exchange,true);
1469 }
1470 }
1471 return true;
1472
1473 }
1474
1475
1476
1477
1478
1479
1480
1481
1482 protected void send(HttpExchange exchange) throws IOException
1483 {
1484 exchange.reset();
1485 customize(exchange);
1486 if (Log.isDebugEnabled())
1487 Log.debug("Send: using any connection=" + exchange);
1488 _httpClient.send(exchange);
1489 }
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500 protected void setInitialized(boolean b)
1501 {
1502 synchronized (_outQ)
1503 {
1504 _initialized = b;
1505 }
1506 }
1507
1508
1509 protected boolean isInitialized()
1510 {
1511 return _initialized;
1512 }
1513
1514
1515
1516
1517
1518
1519 protected void metaConnect(boolean success, Message message)
1520 {
1521 if (!success)
1522 Log.warn(this.toString()+" "+message.toString());
1523 }
1524
1525
1526
1527
1528
1529
1530
1531 protected void metaHandshake(boolean success, boolean reestablish, Message message)
1532 {
1533 if (!success)
1534 Log.warn(this.toString()+" "+message.toString());
1535 }
1536
1537
1538
1539
1540
1541 protected void metaPublishFail(Throwable e, Message[] messages)
1542 {
1543 Log.warn(this.toString()+": "+e);
1544 Log.debug(e);
1545 }
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559 protected String extendOut(String msg)
1560 {
1561 if (_extensions==null)
1562 return msg;
1563
1564 try
1565 {
1566 Message[] messages = _msgPool.parse(msg);
1567 for (int i=0; i<messages.length; i++)
1568 extendOut(messages[i]);
1569 if (messages.length==1 && msg.charAt(0)=='{')
1570 return _msgPool.getMsgJSON().toJSON(messages[0]);
1571 return _msgPool.getMsgJSON().toJSON(messages);
1572 }
1573 catch(IOException e)
1574 {
1575 Log.warn(e);
1576 return msg;
1577 }
1578 }
1579
1580
1581
1582
1583
1584
1585
1586 protected void extendOut(Message message)
1587 {
1588 if (_extensions!=null)
1589 {
1590 Message m = message;
1591 String channelId = m.getChannel();
1592 if (channelId != null)
1593 {
1594 if (channelId.startsWith(Bayeux.META_SLASH))
1595 for (int i=0;m!=null && i<_extensions.length;i++)
1596 m=_extensions[i].sendMeta(this,m);
1597 else
1598 for (int i=0;m!=null && i<_extensions.length;i++)
1599 m=_extensions[i].send(this,m);
1600 }
1601
1602 if (message!=m)
1603 {
1604 message.clear();
1605 if (m!=null)
1606 for (Map.Entry<String,Object> entry:m.entrySet())
1607 message.put(entry.getKey(),entry.getValue());
1608 }
1609 }
1610 }
1611
1612
1613
1614
1615
1616
1617
1618 protected void extendIn(Message message)
1619 {
1620 if (_extensions!=null)
1621 {
1622 Message m = message;
1623 String channelId = m.getChannel();
1624 if (channelId != null)
1625 {
1626 if (channelId.startsWith(Bayeux.META_SLASH))
1627 for (int i=_extensions.length;m!=null && i-->0;)
1628 m=_extensions[i].rcvMeta(this,m);
1629 else
1630 for (int i=_extensions.length;m!=null && i-->0;)
1631 m=_extensions[i].rcv(this,m);
1632 }
1633
1634 if (message!=m)
1635 {
1636 message.clear();
1637 if (m!=null)
1638 for (Map.Entry<String,Object> entry:m.entrySet())
1639 message.put(entry.getKey(),entry.getValue());
1640 }
1641 }
1642 }
1643
1644 private static class ExpirableCookie
1645 {
1646 private final Cookie cookie;
1647 private final long expirationTime;
1648
1649 private ExpirableCookie(Cookie cookie, long expirationTime)
1650 {
1651 this.cookie = cookie;
1652 this.expirationTime = expirationTime;
1653 }
1654
1655 private boolean isExpired()
1656 {
1657 if (expirationTime < 0) return false;
1658 return System.currentTimeMillis() >= expirationTime;
1659 }
1660 }
1661 }