1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.io.IOException;
18 import java.security.SecureRandom;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.ListIterator;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30
31 import javax.servlet.ServletContext;
32 import javax.servlet.http.HttpServletRequest;
33
34 import org.cometd.Bayeux;
35 import org.cometd.BayeuxListener;
36 import org.cometd.Channel;
37 import org.cometd.ChannelBayeuxListener;
38 import org.cometd.Client;
39 import org.cometd.ClientBayeuxListener;
40 import org.cometd.Extension;
41 import org.cometd.Message;
42 import org.cometd.SecurityPolicy;
43 import org.mortbay.util.ajax.JSON;
44
45
46
47
48
49
50
51
52 public abstract class AbstractBayeux extends MessagePool implements Bayeux
53 {
54 public static final ChannelId META_ID=new ChannelId(META);
55 public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
56 public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
57 public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
58 public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
59 public static final ChannelId META_PING_ID=new ChannelId(META_PING);
60 public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
61 public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
62 public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
63
64
65 private static final Map<String,Object> EXT_JSON_COMMENTED=new HashMap<String,Object>(2){
66 {
67 this.put("json-comment-filtered",Boolean.TRUE);
68 }
69 };
70
71
72 private HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
73
74 private ChannelImpl _root = new ChannelImpl("/",this);
75 private ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
76 protected SecurityPolicy _securityPolicy=new DefaultPolicy();
77 protected JSON.Literal _advice;
78 protected JSON.Literal _multiFrameAdvice;
79 protected int _adviceVersion=0;
80 protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
81 protected int _logLevel;
82 protected long _timeout=240000;
83 protected long _interval=0;
84 protected long _maxInterval=30000;
85 protected boolean _JSONCommented;
86 protected boolean _initialized;
87 protected ConcurrentHashMap<String, List<String>> _browser2client=new ConcurrentHashMap<String, List<String>>();
88 protected int _multiFrameInterval=-1;
89
90 protected boolean _directDeliver=true;
91 protected boolean _requestAvailable;
92 protected ThreadLocal<HttpServletRequest> _request = new ThreadLocal<HttpServletRequest>();
93
94 transient ServletContext _context;
95 transient Random _random;
96 transient ConcurrentHashMap<String, ChannelId> _channelIdCache;
97 protected Handler _publishHandler;
98 protected Handler _metaPublishHandler;
99 protected int _maxClientQueue=-1;
100
101 protected List<Extension> _extensions=new CopyOnWriteArrayList<Extension>();
102 protected JSON.Literal _transports=new JSON.Literal("[\""+Bayeux.TRANSPORT_LONG_POLL+ "\",\""+Bayeux.TRANSPORT_CALLBACK_POLL+"\"]");
103 protected List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
104 protected List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
105
106
107
108
109
110
111
112 protected AbstractBayeux()
113 {
114 _publishHandler=new PublishHandler();
115 _metaPublishHandler=new MetaPublishHandler();
116 _handlers.put(META_HANDSHAKE,new HandshakeHandler());
117 _handlers.put(META_CONNECT,new ConnectHandler());
118 _handlers.put(META_DISCONNECT,new DisconnectHandler());
119 _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
120 _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
121 _handlers.put(META_PING,new PingHandler());
122
123 setTimeout(getTimeout());
124 }
125
126
127 public void addExtension(Extension ext)
128 {
129 _extensions.add(ext);
130 }
131
132
133 public List<Extension> getExtensions()
134 {
135
136 return _extensions;
137 }
138
139
140 public void removeExtension(Extension ext)
141 {
142 _extensions.remove(ext);
143 }
144
145
146
147
148
149
150 public ChannelImpl getChannel(ChannelId id)
151 {
152 return _root.getChild(id);
153 }
154
155
156 public ChannelImpl getChannel(String id)
157 {
158 ChannelId cid=getChannelId(id);
159 if (cid.depth()==0)
160 return null;
161 return _root.getChild(cid);
162 }
163
164
165 public Channel getChannel(String id, boolean create)
166 {
167 synchronized(this)
168 {
169 ChannelImpl channel=getChannel(id);
170
171 if (channel==null && create)
172 {
173 channel=new ChannelImpl(id,this);
174 _root.addChild(channel);
175
176 if (isLogInfo())
177 logInfo("newChannel: "+channel);
178 }
179 return channel;
180 }
181 }
182
183
184 public ChannelId getChannelId(String id)
185 {
186 ChannelId cid = _channelIdCache.get(id);
187 if (cid==null)
188 {
189
190 cid=new ChannelId(id);
191 _channelIdCache.put(id,cid);
192 }
193 return cid;
194 }
195
196
197
198
199
200 public Client getClient(String client_id)
201 {
202 synchronized(this)
203 {
204 if (client_id==null)
205 return null;
206 Client client = _clients.get(client_id);
207 return client;
208 }
209 }
210
211
212 public Set<String> getClientIDs()
213 {
214 return _clients.keySet();
215 }
216
217
218
219
220
221 public long getMaxInterval()
222 {
223 return _maxInterval;
224 }
225
226
227
228
229
230 public int getLogLevel()
231 {
232 return _logLevel;
233 }
234
235
236
237
238
239 public SecurityPolicy getSecurityPolicy()
240 {
241 return _securityPolicy;
242 }
243
244
245 public long getTimeout()
246 {
247 return _timeout;
248 }
249
250
251 public long getInterval()
252 {
253 return _interval;
254 }
255
256
257
258
259
260
261 public boolean isDirectDeliver()
262 {
263 return _directDeliver;
264 }
265
266
267
268
269
270
271 public void setDirectDeliver(boolean directDeliver)
272 {
273 _directDeliver = directDeliver;
274 }
275
276
277
278
279
280
281
282
283 public String handle(ClientImpl client, Transport transport, Message message) throws IOException
284 {
285 String channel_id=message.getChannel();
286
287 Handler handler=(Handler)_handlers.get(channel_id);
288 if (handler!=null)
289 {
290
291 ListIterator<Extension> iter = _extensions.listIterator(_extensions.size());
292 while(iter.hasPrevious())
293 message=iter.previous().rcvMeta(message);
294
295 handler.handle(client,transport,message);
296 _metaPublishHandler.handle(client,transport,message);
297 }
298 else if (channel_id.startsWith(META_SLASH))
299 {
300
301 ListIterator<Extension> iter = _extensions.listIterator(_extensions.size());
302 while(iter.hasPrevious())
303 message=iter.previous().rcvMeta(message);
304 _metaPublishHandler.handle(client,transport,message);
305 }
306 else
307 {
308
309 handler=_publishHandler;
310 ListIterator<Extension> iter = _extensions.listIterator(_extensions.size());
311 while(iter.hasPrevious())
312 message=iter.previous().rcv(message);
313 handler.handle(client,transport,message);
314 }
315
316 return channel_id;
317 }
318
319
320 public boolean hasChannel(String id)
321 {
322 ChannelId cid=getChannelId(id);
323 return _root.getChild(cid)!=null;
324 }
325
326
327 public boolean isInitialized()
328 {
329 return _initialized;
330 }
331
332
333
334
335
336 public boolean isJSONCommented()
337 {
338 return _JSONCommented;
339 }
340
341
342 public boolean isLogDebug()
343 {
344 return _logLevel>1;
345 }
346
347
348 public boolean isLogInfo()
349 {
350 return _logLevel>0;
351 }
352
353
354 public void logDebug(String message)
355 {
356 if (_logLevel>1)
357 _context.log(message);
358 }
359
360
361 public void logDebug(String message, Throwable th)
362 {
363 if (_logLevel>1)
364 _context.log(message,th);
365 }
366
367
368 public void logWarn(String message, Throwable th)
369 {
370 _context.log(message+": "+th.toString());
371 }
372
373
374 public void logWarn(String message)
375 {
376 _context.log(message);
377 }
378
379
380 public void logInfo(String message)
381 {
382 if (_logLevel>0)
383 _context.log(message);
384 }
385
386
387 public Client newClient(String idPrefix)
388 {
389 ClientImpl client = new ClientImpl(this,idPrefix);
390 return client;
391 }
392
393
394 public abstract ClientImpl newRemoteClient();
395
396
397
398
399
400
401
402 public Transport newTransport(ClientImpl client, Map<?,?> message)
403 {
404 if (isLogDebug())
405 logDebug("newTransport: client="+client+",message="+message);
406
407 Transport result=null;
408
409 try
410 {
411 String type=client==null?null:client.getConnectionType();
412 if (type==null)
413 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
414
415 if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type) || type==null)
416 {
417 String jsonp=(String)message.get(Bayeux.JSONP_PARAMETER);
418 if(jsonp!=null)
419 result=new JSONPTransport(client!=null&&client.isJSONCommented(),jsonp);
420 else
421 result=new JSONTransport(client!=null&&client.isJSONCommented());
422 }
423 else
424 result=new JSONTransport(client!=null&&client.isJSONCommented());
425
426 }
427 catch (Exception e)
428 {
429 throw new RuntimeException(e);
430 }
431
432 if (isLogDebug())
433 logDebug("newTransport: result="+result);
434 return result;
435 }
436
437
438
439
440
441
442
443
444
445 protected void doPublish(ChannelId to, Client from, Object data, String msgId)
446 {
447 Message msg = newMessage();
448 msg.put(CHANNEL_FIELD,to.toString());
449
450 if (msgId==null)
451 {
452 long id=msg.hashCode()
453 ^(to==null?0:to.hashCode())
454 ^(from==null?0:from.hashCode());
455 id=id<0?-id:id;
456 msg.put(ID_FIELD,Long.toString(id,36));
457 }
458 else
459 msg.put(ID_FIELD,msgId);
460
461 msg.put(DATA_FIELD,data);
462
463 for (Extension e:_extensions)
464 msg=e.send(msg);
465 _root.doDelivery(to,from,msg);
466 ((MessageImpl)msg).decRef();
467 }
468
469
470 public boolean removeChannel(ChannelImpl channel)
471 {
472 boolean removed = _root.doRemove(channel);
473 if (removed)
474 for (ChannelBayeuxListener l : _channelListeners)
475 l.channelRemoved(channel);
476 return removed;
477 }
478
479
480 public void addChannel(ChannelImpl channel)
481 {
482 for (ChannelBayeuxListener l : _channelListeners)
483 l.channelAdded(channel);
484 }
485
486
487 protected String newClientId(long variation, String idPrefix)
488 {
489 if (idPrefix==null)
490 return Long.toString(getRandom(),36)+Long.toString(variation,36);
491 else
492 return idPrefix+"_"+Long.toString(getRandom(),36);
493 }
494
495
496 protected void addClient(ClientImpl client,String idPrefix)
497 {
498 while(true)
499 {
500 String id = newClientId(client.hashCode(),idPrefix);
501 client.setId(id);
502
503 ClientImpl other = _clients.putIfAbsent(id,client);
504 if (other==null)
505 {
506 for (ClientBayeuxListener l : _clientListeners)
507 l.clientAdded((Client)client);
508
509 return;
510 }
511 }
512 }
513
514
515
516
517
518 public Client removeClient(String client_id)
519 {
520 ClientImpl client;
521 synchronized(this)
522 {
523 if (client_id==null)
524 return null;
525 client = _clients.remove(client_id);
526 }
527 if (client!=null)
528 {
529 client.unsubscribeAll();
530 for (ClientBayeuxListener l : _clientListeners)
531 l.clientRemoved((Client)client);
532 }
533 return client;
534 }
535
536
537
538
539
540 public void setMaxInterval(long ms)
541 {
542 _maxInterval=ms;
543 }
544
545
546
547
548
549 public void setJSONCommented(boolean commented)
550 {
551 _JSONCommented=commented;
552 }
553
554
555
556
557
558
559 public void setLogLevel(int logLevel)
560 {
561 _logLevel=logLevel;
562 }
563
564
565
566
567
568 public void setSecurityPolicy(SecurityPolicy securityPolicy)
569 {
570 _securityPolicy=securityPolicy;
571 }
572
573
574
575 public void setTimeout(long ms)
576 {
577 _timeout = ms;
578 generateAdvice();
579 }
580
581
582
583 public void setInterval(long ms)
584 {
585 _interval = ms;
586 generateAdvice();
587 }
588
589
590
591
592
593
594
595
596 public void setMultiFrameInterval(int multiFrameInterval)
597 {
598 _multiFrameInterval=multiFrameInterval;
599 generateAdvice();
600 }
601
602
603
604
605
606 public int getMultiFrameInterval()
607 {
608 return _multiFrameInterval;
609 }
610
611
612 void generateAdvice()
613 {
614 setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":"+getInterval()+",\"timeout\":"+getTimeout()+"}"));
615 }
616
617
618 public void setAdvice(JSON.Literal advice)
619 {
620 synchronized(this)
621 {
622 _adviceVersion++;
623 _advice=advice;
624 _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
625 }
626 }
627
628
629 private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
630 {
631 Map<String,Object> a = (Map<String,Object>)JSON.parse(_advice.toString());
632 a.put("multiple-clients",Boolean.TRUE);
633 if (_multiFrameInterval>0)
634 {
635 a.put("reconnect","retry");
636 a.put("interval",_multiFrameInterval);
637 }
638 else
639 a.put("reconnect","none");
640 return a;
641 }
642
643
644
645
646 public JSON.Literal getAdvice()
647 {
648 return _advice;
649 }
650
651
652
653
654
655 public boolean isRequestAvailable()
656 {
657 return _requestAvailable;
658 }
659
660
661
662
663
664 public void setRequestAvailable(boolean requestAvailable)
665 {
666 _requestAvailable=requestAvailable;
667 }
668
669
670
671
672
673 public HttpServletRequest getCurrentRequest()
674 {
675 return _request.get();
676 }
677
678
679
680
681
682 void setCurrentRequest(HttpServletRequest request)
683 {
684 _request.set(request);
685 }
686
687
688
689
690 public Collection<Channel> getChannels()
691 {
692 List<Channel> channels = new ArrayList<Channel>();
693 _root.getChannels(channels);
694 return channels;
695 }
696
697
698
699
700
701 public int getChannelCount()
702 {
703 return _root.getChannelCount();
704 }
705
706
707 public Collection<Client> getClients()
708 {
709 synchronized(this)
710 {
711 return new ArrayList<Client>(_clients.values());
712 }
713 }
714
715
716
717
718
719 public int getClientCount()
720 {
721 synchronized(this)
722 {
723 return _clients.size();
724 }
725 }
726
727
728 public boolean hasClient(String clientId)
729 {
730 synchronized(this)
731 {
732 if (clientId==null)
733 return false;
734 return _clients.containsKey(clientId);
735 }
736 }
737
738
739 public Channel removeChannel(String channelId)
740 {
741 Channel channel = getChannel(channelId);
742
743 boolean removed = false;
744 if (channel!=null)
745 removed = channel.remove();
746
747 if (removed)
748 return channel;
749 else
750 return null;
751 }
752
753
754 protected void initialize(ServletContext context)
755 {
756 synchronized(this)
757 {
758 _initialized=true;
759 _context=context;
760 try
761 {
762 _random=SecureRandom.getInstance("SHA1PRNG");
763 }
764 catch (Exception e)
765 {
766 context.log("Could not get secure random for ID generation",e);
767 _random=new Random();
768 }
769 _random.setSeed(_random.nextLong()^hashCode()^(context.hashCode()<<32)^Runtime.getRuntime().freeMemory());
770 _channelIdCache=new ConcurrentHashMap<String, ChannelId>();
771
772 _root.addChild(new ServiceChannel(Bayeux.SERVICE));
773
774 }
775 }
776
777
778 long getRandom()
779 {
780 long l=_random.nextLong();
781 return l<0?-l:l;
782 }
783
784
785 void clientOnBrowser(String browserId,String clientId)
786 {
787 List<String> clients=_browser2client.get(browserId);
788 if (clients==null)
789 {
790 List<String> new_clients=new CopyOnWriteArrayList<String>();
791 clients=_browser2client.putIfAbsent(browserId,new_clients);
792 if (clients==null)
793 clients=new_clients;
794 }
795 clients.add(clientId);
796 }
797
798
799 void clientOffBrowser(String browserId,String clientId)
800 {
801 List<String> clients=_browser2client.get(browserId);
802 if (clients!=null)
803 clients.remove(clientId);
804 }
805
806
807 List<String> clientsOnBrowser(String browserId)
808 {
809 List<String> clients=_browser2client.get(browserId);
810 if (clients==null)
811 return Collections.emptyList();
812 return clients;
813 }
814
815
816 public void addListener(BayeuxListener listener)
817 {
818 if (listener instanceof ClientBayeuxListener)
819 _clientListeners.add((ClientBayeuxListener)listener);
820 else if(listener instanceof ChannelBayeuxListener)
821 _channelListeners.add((ChannelBayeuxListener)listener);
822 }
823
824
825 public int getMaxClientQueue()
826 {
827 return _maxClientQueue;
828 }
829
830
831 public void setMaxClientQueue(int size)
832 {
833 _maxClientQueue=size;
834 }
835
836
837
838
839
840 public static class DefaultPolicy implements SecurityPolicy
841 {
842 public boolean canHandshake(Message message)
843 {
844 return true;
845 }
846
847 public boolean canCreate(Client client, String channel, Message message)
848 {
849 return client!=null && !channel.startsWith(Bayeux.META_SLASH);
850 }
851
852 public boolean canSubscribe(Client client, String channel, Message message)
853 {
854 if (client!=null && ("/**".equals(channel) || "/*".equals(channel)))
855 return false;
856 return client!=null && !channel.startsWith(Bayeux.META_SLASH);
857 }
858
859 public boolean canPublish(Client client, String channel, Message message)
860 {
861 return client!=null || client==null && Bayeux.META_HANDSHAKE.equals(channel);
862 }
863
864 }
865
866
867
868
869 protected abstract class Handler
870 {
871 abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
872 abstract ChannelId getMetaChannelId();
873 void unknownClient(Transport transport,String channel) throws IOException
874 {
875 MessageImpl reply=newMessage();
876
877 reply.put(CHANNEL_FIELD,channel);
878 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
879 reply.put(ERROR_FIELD,"402::Unknown client");
880 reply.put("advice",_handshakeAdvice);
881 transport.send(reply);
882 }
883 }
884
885
886
887 protected class ConnectHandler extends Handler
888 {
889 protected String _metaChannel=META_CONNECT;
890
891 @Override
892 ChannelId getMetaChannelId()
893 {
894 return META_CONNECT_ID;
895 }
896
897 @Override
898 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
899 {
900 if (client==null)
901 {
902 unknownClient(transport,_metaChannel);
903 return;
904 }
905
906
907 String type=client.getConnectionType();
908 boolean polling=true;
909 if (type==null)
910 {
911 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
912 client.setConnectionType(type);
913 polling=false;
914 }
915
916 Object advice = message.get(ADVICE_FIELD);
917 if (advice!=null)
918 {
919 Long timeout=(Long)((Map)advice).get("timeout");
920 if (timeout!=null && timeout.longValue()>0)
921 client.setTimeout(timeout.longValue());
922 else
923 client.setTimeout(0);
924 }
925 else
926 client.setTimeout(0);
927
928 advice=null;
929
930
931 if (polling && _multiFrameInterval>0 && client.getBrowserId()!=null)
932 {
933 List<String> clients=clientsOnBrowser(client.getBrowserId());
934 int count=clients.size();
935 if (count>1)
936 {
937 polling=clients.get(0).equals(client.getId());
938 advice=client.getAdvice();
939 if (advice==null)
940 advice=_multiFrameAdvice;
941 else
942 advice=multiFrameAdvice((JSON.Literal)advice);
943 }
944 }
945
946 synchronized(this)
947 {
948 if (advice==null)
949 {
950 if (_adviceVersion!=client._adviseVersion)
951 {
952 advice=_advice;
953 client._adviseVersion=_adviceVersion;
954 }
955 }
956 else
957 client._adviseVersion=-1;
958 }
959
960
961 String id=message.getId();
962
963 Message reply=newMessage(message);
964
965 reply.put(CHANNEL_FIELD,META_CONNECT);
966 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
967 if (advice!=null)
968 reply.put(ADVICE_FIELD,advice);
969 if (id!=null)
970 reply.put(ID_FIELD,id);
971
972 if (polling)
973 transport.setPollReply(reply);
974 else
975 {
976 for (Extension e:_extensions)
977 reply=e.sendMeta(reply);
978 transport.send(reply);
979 }
980 }
981 }
982
983
984
985 protected class DisconnectHandler extends Handler
986 {
987 @Override
988 ChannelId getMetaChannelId()
989 {
990 return META_DISCONNECT_ID;
991 }
992
993 @Override
994 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
995 {
996 if (client==null)
997 {
998 unknownClient(transport,META_DISCONNECT);
999 return;
1000 }
1001 if (isLogInfo())
1002 logInfo("Disconnect "+client.getId());
1003
1004 client.remove(false);
1005
1006 Message reply=newMessage(message);
1007 reply.put(CHANNEL_FIELD,META_DISCONNECT);
1008 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1009 String id=message.getId();
1010 if (id!=null)
1011 reply.put(ID_FIELD,id);
1012
1013 for (Extension e:_extensions)
1014 reply=e.sendMeta(reply);
1015
1016 Message pollReply = transport.getPollReply();
1017 if (pollReply!=null)
1018 {
1019 for (Extension e:_extensions)
1020 pollReply=e.sendMeta(pollReply);
1021 transport.send(pollReply);
1022 transport.setPollReply(null);
1023 }
1024 transport.send(reply);
1025 }
1026 }
1027
1028
1029
1030
1031 protected class HandshakeHandler extends Handler
1032 {
1033 @Override
1034 ChannelId getMetaChannelId()
1035 {
1036 return META_HANDSHAKE_ID;
1037 }
1038
1039 @Override
1040 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1041 {
1042 if (client!=null)
1043 throw new IllegalStateException();
1044
1045 if (_securityPolicy!=null && !_securityPolicy.canHandshake(message))
1046 {
1047 Message reply=newMessage(message);
1048 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1049 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1050 reply.put(ERROR_FIELD,"403::Handshake denied");
1051
1052 for (Extension e:_extensions)
1053 reply=e.sendMeta(reply);
1054
1055 transport.send(reply);
1056 return;
1057 }
1058
1059 client=newRemoteClient();
1060
1061 Map<?,?> ext = (Map<?,?>)message.get(EXT_FIELD);
1062
1063 boolean commented=_JSONCommented && ext!=null && Boolean.TRUE.equals(ext.get("json-comment-filtered"));
1064
1065 Message reply=newMessage(message);
1066 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1067 reply.put("version","1.0");
1068 reply.put("minimumVersion","0.9");
1069 if (isJSONCommented())
1070 reply.put(EXT_FIELD,EXT_JSON_COMMENTED);
1071
1072 if (client!=null)
1073 {
1074 reply.put("supportedConnectionTypes",_transports);
1075 reply.put("successful",Boolean.TRUE);
1076 reply.put(CLIENT_FIELD,client.getId());
1077 if (_advice!=null)
1078 reply.put(ADVICE_FIELD,_advice);
1079 client.setJSONCommented(commented);
1080 transport.setJSONCommented(commented);
1081 }
1082 else
1083 {
1084 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1085 if (_advice!=null)
1086 reply.put(ADVICE_FIELD,_advice);
1087 }
1088
1089 if (isLogDebug())
1090 logDebug("handshake.handle: reply="+reply);
1091
1092 String id=message.getId();
1093 if (id!=null)
1094 reply.put(ID_FIELD,id);
1095
1096 for (Extension e:_extensions)
1097 reply=e.sendMeta(reply);
1098 transport.send(reply);
1099 }
1100 }
1101
1102
1103
1104 protected class PublishHandler extends Handler
1105 {
1106 @Override
1107 ChannelId getMetaChannelId()
1108 {
1109 return null;
1110 }
1111
1112 @Override
1113 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1114 {
1115 String channel_id=message.getChannel();
1116
1117 if (client==null && message.containsKey(CLIENT_FIELD))
1118 {
1119 unknownClient(transport,channel_id);
1120 return;
1121 }
1122
1123 String id=message.getId();
1124
1125 ChannelId cid=getChannelId(channel_id);
1126 Object data=message.get(Bayeux.DATA_FIELD);
1127
1128 Message reply=newMessage(message);
1129 reply.put(CHANNEL_FIELD,channel_id);
1130 if (id!=null)
1131 reply.put(ID_FIELD,id);
1132
1133 if (data!=null&&_securityPolicy.canPublish(client,channel_id,message))
1134 {
1135 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1136
1137 for (Extension e:_extensions)
1138 reply=e.sendMeta(reply);
1139
1140 transport.send(reply);
1141 if (_directDeliver)
1142 {
1143 message.remove(CLIENT_FIELD);
1144 for (Extension e:_extensions)
1145 message=e.send(message);
1146 _root.doDelivery(cid,client,message);
1147 }
1148 else
1149 doPublish(cid,client,data,id==null?null:id);
1150 }
1151 else
1152 {
1153 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1154 reply.put(ERROR_FIELD,"403::Publish denied");
1155
1156 for (Extension e:_extensions)
1157 reply=e.sendMeta(reply);
1158 transport.send(reply);
1159 }
1160 }
1161 }
1162
1163
1164
1165 protected class MetaPublishHandler extends Handler
1166 {
1167 @Override
1168 ChannelId getMetaChannelId()
1169 {
1170 return null;
1171 }
1172
1173 @Override
1174 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1175 {
1176 String channel_id=message.getChannel();
1177
1178 if (client==null && !META_HANDSHAKE.equals(channel_id))
1179 {
1180
1181 return;
1182 }
1183
1184 if(_securityPolicy.canPublish(client,channel_id,message))
1185 {
1186 _root.doDelivery(getChannelId(channel_id),client,message);
1187 }
1188 }
1189 }
1190
1191
1192
1193 protected class SubscribeHandler extends Handler
1194 {
1195 @Override
1196 ChannelId getMetaChannelId()
1197 {
1198 return META_SUBSCRIBE_ID;
1199 }
1200
1201 @Override
1202 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1203 {
1204 if (client==null)
1205 {
1206 unknownClient(transport,META_SUBSCRIBE);
1207 return;
1208 }
1209
1210 String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1211
1212
1213 if (subscribe_id==null)
1214 {
1215 subscribe_id=Long.toString(getRandom(),36);
1216 while (getChannel(subscribe_id)!=null)
1217 subscribe_id=Long.toString(getRandom(),36);
1218 }
1219
1220 ChannelId cid=null;
1221 boolean can_subscribe=false;
1222
1223 if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1224 {
1225 can_subscribe=true;
1226 }
1227 else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1228 {
1229 can_subscribe=false;
1230 }
1231 else
1232 {
1233 cid=getChannelId(subscribe_id);
1234 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1235 }
1236
1237 Message reply=newMessage(message);
1238 reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1239 reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1240
1241 if (can_subscribe)
1242 {
1243 if (cid!=null)
1244 {
1245 ChannelImpl channel=getChannel(cid);
1246 if (channel==null&&_securityPolicy.canCreate(client,subscribe_id,message))
1247 channel=(ChannelImpl)getChannel(subscribe_id, true);
1248
1249 if (channel!=null)
1250 channel.subscribe(client);
1251 else
1252 can_subscribe=false;
1253 }
1254
1255 if (can_subscribe)
1256 {
1257 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1258 }
1259 else
1260 {
1261 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1262 reply.put(ERROR_FIELD,"403::cannot create");
1263 }
1264 }
1265 else
1266 {
1267 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1268 reply.put(ERROR_FIELD,"403::cannot subscribe");
1269
1270 }
1271
1272 String id=message.getId();
1273 if (id!=null)
1274 reply.put(ID_FIELD,id);
1275 for (Extension e:_extensions)
1276 reply=e.sendMeta(reply);
1277 transport.send(reply);
1278 }
1279 }
1280
1281
1282
1283 protected class UnsubscribeHandler extends Handler
1284 {
1285 @Override
1286 ChannelId getMetaChannelId()
1287 {
1288 return META_UNSUBSCRIBE_ID;
1289 }
1290
1291 @Override
1292 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1293 {
1294 if (client==null)
1295 {
1296 unknownClient(transport,META_UNSUBSCRIBE);
1297 return;
1298 }
1299
1300 String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1301 ChannelImpl channel=getChannel(channel_id);
1302 if (channel!=null)
1303 channel.unsubscribe(client);
1304
1305 Message reply=newMessage(message);
1306 reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1307 if (channel!=null)
1308 {
1309 channel.unsubscribe(client);
1310 reply.put(SUBSCRIPTION_FIELD,channel.getId());
1311 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1312 }
1313 else
1314 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1315
1316 String id=message.getId();
1317 if (id!=null)
1318 reply.put(ID_FIELD,id);
1319 for (Extension e:_extensions)
1320 reply=e.sendMeta(reply);
1321 transport.send(reply);
1322 }
1323 }
1324
1325
1326
1327 protected class PingHandler extends Handler
1328 {
1329 @Override
1330 ChannelId getMetaChannelId()
1331 {
1332 return META_PING_ID;
1333 }
1334
1335 @Override
1336 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1337 {
1338 Message reply=newMessage(message);
1339 reply.put(CHANNEL_FIELD,META_PING);
1340 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1341
1342 String id=message.getId();
1343 if (id!=null)
1344 reply.put(ID_FIELD,id);
1345 for (Extension e:_extensions)
1346 reply=e.sendMeta(reply);
1347 transport.send(reply);
1348 }
1349 }
1350
1351
1352
1353
1354 protected class ServiceChannel extends ChannelImpl
1355 {
1356 ServiceChannel(String id)
1357 {
1358 super(id,AbstractBayeux.this);
1359 }
1360
1361
1362
1363
1364
1365 @Override
1366 public void addChild(ChannelImpl channel)
1367 {
1368 super.addChild(channel);
1369 setPersistent(true);
1370 }
1371
1372
1373 @Override
1374 public void subscribe(Client client)
1375 {
1376 if (client.isLocal())
1377 super.subscribe(client);
1378 }
1379
1380 }
1381 }