1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.client;
16
17 import java.io.IOException;
18 import java.net.URLEncoder;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Queue;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 import javax.servlet.http.Cookie;
29
30 import org.cometd.Bayeux;
31 import org.cometd.Client;
32 import org.cometd.ClientListener;
33 import org.cometd.Listener;
34 import org.cometd.Message;
35 import org.cometd.MessageListener;
36 import org.cometd.RemoveListener;
37 import org.mortbay.cometd.MessageImpl;
38 import org.mortbay.cometd.MessagePool;
39 import org.mortbay.io.Buffer;
40 import org.mortbay.io.ByteArrayBuffer;
41 import org.mortbay.jetty.HttpHeaders;
42 import org.mortbay.jetty.HttpSchemes;
43 import org.mortbay.jetty.client.Address;
44 import org.mortbay.jetty.client.HttpClient;
45 import org.mortbay.jetty.client.HttpExchange;
46 import org.mortbay.log.Log;
47 import org.mortbay.util.ArrayQueue;
48 import org.mortbay.util.QuotedStringTokenizer;
49 import org.mortbay.util.ajax.JSON;
50
51
52
53
54
55
56
57
58
59
60
61 public class BayeuxClient extends MessagePool implements Client
62 {
63 private HttpClient _client;
64 private Address _address;
65 private HttpExchange _pull;
66 private HttpExchange _push;
67 private String _uri="/cometd";
68 private boolean _initialized=false;
69 private boolean _disconnecting=false;
70 private String _clientId;
71 private Listener _listener;
72 private List<RemoveListener> _rListeners;
73 private List<MessageListener> _mListeners;
74 private Queue<Message> _inQ;
75 private Queue<Message> _outQ;
76 private int _batch;
77 private boolean _formEncoded;
78 private Map<String, Cookie> _cookies=new ConcurrentHashMap<String, Cookie>();
79 private Advice _advice;
80 private Timer _timer;
81
82
83 public BayeuxClient(HttpClient client, Address address, String uri, Timer timer) throws IOException
84 {
85 _client=client;
86 _address=address;
87 _uri=uri;
88
89 _inQ=new ArrayQueue<Message>();
90 _outQ=new ArrayQueue<Message>();
91
92 _timer = timer;
93 if (_timer == null)
94 _timer = new Timer("DefaultBayeuxClientTimer", true);
95 }
96
97 public BayeuxClient(HttpClient client, Address address, String uri) throws IOException
98 {
99 this (client, address, uri, new Timer("DefaultBayeuxClientTimer", true));
100 }
101
102
103
104
105
106
107 public String getId()
108 {
109 return _clientId;
110 }
111
112
113 public void start()
114 {
115 synchronized (_outQ)
116 {
117 if (!_initialized && _pull==null)
118 _pull=new Handshake();
119 }
120 }
121
122
123 public boolean isPolling()
124 {
125 synchronized (_outQ)
126 {
127 return _pull!=null;
128 }
129 }
130
131
132
133
134
135
136 public void deliver(Client from, Message message)
137 {
138 synchronized (_inQ)
139 {
140 if (_mListeners==null)
141 _inQ.add(message);
142 else
143 {
144 for (MessageListener l : _mListeners)
145 l.deliver(from,this,message);
146 }
147 }
148 }
149
150
151
152
153
154 public void deliver(Client from, String toChannel, Object data, String id)
155 {
156 Message message = new MessageImpl();
157
158 message.put(Bayeux.CHANNEL_FIELD,toChannel);
159 message.put(Bayeux.DATA_FIELD,data);
160 if (id!=null)
161 message.put(Bayeux.ID_FIELD,id);
162
163 synchronized (_inQ)
164 {
165 if (_mListeners==null)
166 _inQ.add(message);
167 else
168 {
169 for (MessageListener l : _mListeners)
170 l.deliver(from,this,message);
171 }
172 }
173 }
174
175
176
177
178
179 public Listener getListener()
180 {
181 synchronized (_inQ)
182 {
183 return _listener;
184 }
185 }
186
187
188
189
190
191 public boolean hasMessages()
192 {
193 synchronized (_inQ)
194 {
195 return _inQ.size()>0;
196 }
197 }
198
199
200
201
202
203 public boolean isLocal()
204 {
205 return false;
206 }
207
208
209
210
211
212
213 private void publish(Message msg)
214 {
215 synchronized (_outQ)
216 {
217 _outQ.add(msg);
218
219 if (_batch==0&&_initialized&&_push==null)
220 _push=new Publish();
221 }
222 }
223
224
225
226
227
228 public void publish(String toChannel, Object data, String msgId)
229 {
230 Message msg=new MessageImpl();
231 msg.put(Bayeux.CHANNEL_FIELD,toChannel);
232 msg.put(Bayeux.DATA_FIELD,data);
233 if (msgId!=null)
234 msg.put(Bayeux.ID_FIELD,msgId);
235 publish(msg);
236 }
237
238
239
240
241
242 public void subscribe(String toChannel)
243 {
244 Message msg=new MessageImpl();
245 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_SUBSCRIBE);
246 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
247 publish(msg);
248 }
249
250
251
252
253
254 public void unsubscribe(String toChannel)
255 {
256 Message msg=new MessageImpl();
257 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_UNSUBSCRIBE);
258 msg.put(Bayeux.SUBSCRIPTION_FIELD,toChannel);
259 publish(msg);
260 }
261
262
263
264
265
266 public void remove(boolean timeout)
267 {
268 Message msg=new MessageImpl();
269 msg.put(Bayeux.CHANNEL_FIELD,Bayeux.META_DISCONNECT);
270
271 synchronized (_outQ)
272 {
273 _outQ.add(msg);
274
275 _initialized=false;
276 _disconnecting=true;
277
278 if (_batch==0&&_initialized&&_push==null)
279 _push=new Publish();
280
281 }
282 }
283
284
285
286
287
288 public void setListener(Listener listener)
289 {
290 synchronized (_inQ)
291 {
292 if (_listener!=null)
293 removeListener(_listener);
294 _listener=listener;
295 if (_listener!=null)
296 addListener(_listener);
297 }
298 }
299
300
301
302
303
304
305
306 public List<Message> takeMessages()
307 {
308 synchronized (_inQ)
309 {
310 LinkedList<Message> list=new LinkedList<Message>(_inQ);
311 _inQ.clear();
312 return list;
313 }
314 }
315
316
317
318
319
320 public void endBatch()
321 {
322 synchronized (_outQ)
323 {
324 if (--_batch<=0)
325 {
326 _batch=0;
327 if ((_initialized||_disconnecting)&&_push==null&&_outQ.size()>0)
328 _push=new Publish();
329 }
330 }
331 }
332
333
334
335
336
337 public void startBatch()
338 {
339 synchronized (_outQ)
340 {
341 _batch++;
342 }
343 }
344
345
346
347
348
349
350
351 protected void customize(HttpExchange exchange)
352 {
353 StringBuilder buf=null;
354 for (Cookie cookie : _cookies.values())
355 {
356 if (buf==null)
357 buf=new StringBuilder();
358 else
359 buf.append("; ");
360 buf.append(cookie.getName());
361 buf.append("=");
362 buf.append(cookie.getValue());
363 }
364 if (buf!=null)
365 exchange.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
366 }
367
368
369 public void setCookie(Cookie cookie)
370 {
371 _cookies.put(cookie.getName(),cookie);
372 }
373
374
375
376
377 private class Exchange extends HttpExchange.ContentExchange
378 {
379 Object[] _responses;
380 int _connectFailures;
381
382 Exchange(String info)
383 {
384 setMethod("POST");
385 setScheme(HttpSchemes.HTTP_BUFFER);
386 setAddress(_address);
387 setURI(_uri+"/"+info);
388
389 setRequestContentType(_formEncoded?"application/x-www-form-urlencoded;charset=utf-8":"text/json;charset=utf-8");
390 }
391
392 protected void setMessage(String message)
393 {
394 try
395 {
396 if (_formEncoded)
397 setRequestContent(new ByteArrayBuffer("message="+URLEncoder.encode(message,"utf-8")));
398 else
399 setRequestContent(new ByteArrayBuffer(message,"utf-8"));
400 }
401 catch (Exception e)
402 {
403 Log.warn(e);
404 }
405 }
406
407 protected void setMessages(Queue<Message> messages)
408 {
409 try
410 {
411 for (Message msg : messages)
412 {
413 msg.put(Bayeux.CLIENT_FIELD,_clientId);
414 }
415 String json=JSON.toString(messages);
416
417 if (_formEncoded)
418 setRequestContent(new ByteArrayBuffer("message="+URLEncoder.encode(json,"utf-8")));
419 else
420 setRequestContent(new ByteArrayBuffer(json,"utf-8"));
421
422 }
423 catch (Exception e)
424 {
425 Log.warn(e);
426 }
427
428 }
429
430
431 protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
432 {
433 super.onResponseStatus(version,status,reason);
434 }
435
436
437 protected void onResponseHeader(Buffer name, Buffer value) throws IOException
438 {
439 super.onResponseHeader(name,value);
440 if (HttpHeaders.CACHE.getOrdinal(name)==HttpHeaders.SET_COOKIE_ORDINAL)
441 {
442 String cname=null;
443 String cvalue=null;
444
445 QuotedStringTokenizer tok=new QuotedStringTokenizer(value.toString(),"=;",false,false);
446 tok.setSingle(false);
447
448 if (tok.hasMoreElements())
449 cname=tok.nextToken();
450 if (tok.hasMoreElements())
451 cvalue=tok.nextToken();
452
453 Cookie cookie=new Cookie(cname,cvalue);
454
455 while (tok.hasMoreTokens())
456 {
457 String token=tok.nextToken();
458 if ("Version".equalsIgnoreCase(token))
459 cookie.setVersion(Integer.parseInt(tok.nextToken()));
460 else if ("Comment".equalsIgnoreCase(token))
461 cookie.setComment(tok.nextToken());
462 else if ("Path".equalsIgnoreCase(token))
463 cookie.setPath(tok.nextToken());
464 else if ("Domain".equalsIgnoreCase(token))
465 cookie.setDomain(tok.nextToken());
466 else if ("Expires".equalsIgnoreCase(token))
467 {
468 tok.nextToken();
469
470 }
471 else if ("Max-Age".equalsIgnoreCase(token))
472 {
473 tok.nextToken();
474
475 }
476 else if ("Secure".equalsIgnoreCase(token))
477 cookie.setSecure(true);
478 }
479
480 BayeuxClient.this.setCookie(cookie);
481 }
482 }
483
484
485 protected void onResponseComplete() throws IOException
486 {
487 super.onResponseComplete();
488
489 if (getResponseStatus()==200)
490 {
491 String content = getResponseContent();
492 if (content==null || content.length()==0)
493 throw new IllegalStateException();
494 _responses=parse(content);
495 }
496 }
497
498
499 protected void onExpire()
500 {
501 super.onExpire();
502 }
503
504
505 protected void onConnectionFailed(Throwable ex)
506 {
507 super.onConnectionFailed(ex);
508 if (++_connectFailures<5)
509 {
510 try
511 {
512 _client.send(this);
513 }
514 catch (IOException e)
515 {
516 Log.warn(e);
517 }
518 }
519 }
520
521
522 protected void onException(Throwable ex)
523 {
524 super.onException(ex);
525 }
526
527 }
528
529
530
531
532
533
534 private class Handshake extends Exchange
535 {
536 final static String __HANDSHAKE="[{"+"\"channel\":\"/meta/handshake\","+"\"version\":\"0.9\","+"\"minimumVersion\":\"0.9\""+"}]";
537
538 Handshake()
539 {
540 super("handshake");
541 setMessage(__HANDSHAKE);
542
543 try
544 {
545 customize(this);
546 _client.send(this);
547 }
548 catch (IOException e)
549 {
550 Log.warn(e);
551 }
552 }
553
554
555
556
557
558 protected void onException(Throwable ex)
559 {
560 Log.warn("Handshake:"+ex);
561 Log.debug(ex);
562 }
563
564
565
566
567
568 protected void onResponseComplete() throws IOException
569 {
570 super.onResponseComplete();
571 if (getResponseStatus()==200&&_responses!=null&&_responses.length>0)
572 {
573 Map<?,?> response=(Map<?,?>)_responses[0];
574 Boolean successful=(Boolean)response.get(Bayeux.SUCCESSFUL_FIELD);
575 if (successful!=null&&successful.booleanValue())
576 {
577 _clientId=(String)response.get(Bayeux.CLIENT_FIELD);
578 _pull=new Connect();
579 }
580 else
581 throw new IOException("Handshake failed:"+_responses[0]);
582 }
583 else
584 {
585 throw new IOException("Handshake failed: "+getResponseStatus());
586 }
587 }
588 }
589
590
591
592
593
594 private class Connect extends Exchange
595 {
596 Connect()
597 {
598 super("connect");
599 String connect="{"+"\"channel\":\"/meta/connect\","+"\"clientId\":\""+_clientId+"\","+"\"connectionType\":\"long-polling\""+"}";
600 setMessage(connect);
601
602 try
603 {
604 customize(this);
605 _client.send(this);
606 }
607 catch (IOException e)
608 {
609 Log.warn(e);
610 }
611 }
612
613 protected void onResponseComplete() throws IOException
614 {
615 super.onResponseComplete();
616 if (getResponseStatus()==200&&_responses!=null&&_responses.length>0)
617 {
618 try
619 {
620 startBatch();
621
622 for (int i=0; i<_responses.length; i++)
623 {
624 Message msg=(Message)_responses[i];
625
626 if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD)))
627 {
628 Boolean successful=(Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD);
629 if (successful!=null&&successful.booleanValue())
630 {
631 if (!_initialized)
632 {
633 _initialized=true;
634 synchronized (_outQ)
635 {
636 if (_outQ.size()>0)
637 _push=new Publish();
638 }
639 }
640
641 Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD);
642 if (adviceField != null)
643 _advice = new Advice(adviceField);
644
645
646
647 if (_advice != null && _advice.getInterval() > 0)
648 {
649 TimerTask task = new TimerTask()
650 {
651 public void run()
652 {
653 _pull=new Connect();
654 }
655 };
656 _timer.schedule(task, _advice.getInterval());
657 }
658 else
659 _pull=new Connect();
660 }
661 else
662 throw new IOException("Connect failed:"+_responses[0]);
663 }
664
665 deliver(null,msg);
666 }
667 }
668 finally
669 {
670 endBatch();
671 }
672
673 }
674 else
675 {
676 throw new IOException("Connect failed: "+getResponseStatus());
677 }
678 }
679 }
680
681
682
683
684
685
686 private class Publish extends Exchange
687 {
688 Publish()
689 {
690 super("publish");
691 synchronized (_outQ)
692 {
693 if (_outQ.size()==0)
694 return;
695 setMessages(_outQ);
696 _outQ.clear();
697 }
698 try
699 {
700 customize(this);
701 _client.send(this);
702 }
703 catch (IOException e)
704 {
705 Log.warn(e);
706 }
707 }
708
709
710
711
712
713 protected void onResponseComplete() throws IOException
714 {
715 super.onResponseComplete();
716
717 try
718 {
719 synchronized (_outQ)
720 {
721 startBatch();
722 _push=null;
723 }
724
725 if (getResponseStatus()==200&&_responses!=null&&_responses.length>0)
726 {
727
728 for (int i=0; i<_responses.length; i++)
729 {
730 Message msg=(Message)_responses[i];
731 deliver(null,msg);
732 }
733 }
734 else
735 {
736 throw new IOException("Reconnect failed: "+getResponseStatus());
737 }
738 }
739 finally
740 {
741 endBatch();
742 }
743 }
744 }
745
746 public void addListener(ClientListener listener)
747 {
748 synchronized(_inQ)
749 {
750 if (listener instanceof MessageListener)
751 {
752 if (_mListeners==null)
753 _mListeners=new ArrayList<MessageListener>();
754 _mListeners.add((MessageListener)listener);
755 }
756 if (listener instanceof RemoveListener)
757 {
758 if (_rListeners==null)
759 _rListeners=new ArrayList<RemoveListener>();
760 _rListeners.add((RemoveListener)listener);
761 }
762 }
763 }
764
765 public void removeListener(ClientListener listener)
766 {
767 synchronized(_inQ)
768 {
769 if (listener instanceof MessageListener)
770 {
771 if (_mListeners!=null)
772 _mListeners.remove((MessageListener)listener);
773 }
774 if (listener instanceof RemoveListener)
775 {
776 if (_rListeners!=null)
777 _rListeners.remove((RemoveListener)listener);
778 }
779 }
780 }
781
782 public int getMaxQueue()
783 {
784 return -1;
785 }
786
787 public Queue<Message> getQueue()
788 {
789 return _inQ;
790 }
791
792 public void setMaxQueue(int max)
793 {
794 if( max!=-1)
795 throw new UnsupportedOperationException();
796 }
797 }