1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.io.IOException;
18 import java.security.SecureRandom;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import javax.servlet.ServletContext;
31 import javax.servlet.http.HttpServletRequest;
32
33 import org.cometd.Bayeux;
34 import org.cometd.BayeuxListener;
35 import org.cometd.Channel;
36 import org.cometd.ChannelBayeuxListener;
37 import org.cometd.Client;
38 import org.cometd.ClientBayeuxListener;
39 import org.cometd.Extension;
40 import org.cometd.Message;
41 import org.cometd.SecurityPolicy;
42 import org.mortbay.util.LazyList;
43 import org.mortbay.util.ajax.JSON;
44
45
46
47
48
49
50 public abstract class AbstractBayeux extends MessagePool implements Bayeux
51 {
52 public static final ChannelId META_ID=new ChannelId(META);
53 public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
54 public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
55 public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
56 public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
57 public static final ChannelId META_PING_ID=new ChannelId(META_PING);
58 public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
59 public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
60 public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
61
62 private final HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
63 private final ChannelImpl _root=new ChannelImpl("/",this);
64 private final ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
65 protected final ConcurrentHashMap<String,ChannelId> _channelIdCache=new ConcurrentHashMap<String,ChannelId>();
66 protected final ConcurrentHashMap<String,List<String>> _browser2client=new ConcurrentHashMap<String,List<String>>();
67 protected final ThreadLocal<HttpServletRequest> _request=new ThreadLocal<HttpServletRequest>();
68 protected final List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
69 protected final List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
70 protected final Handler _publishHandler;
71 protected final Handler _metaPublishHandler;
72
73 protected SecurityPolicy _securityPolicy=new DefaultPolicy();
74 protected JSON.Literal _advice;
75 protected JSON.Literal _multiFrameAdvice;
76 protected int _adviceVersion=0;
77 protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
78 protected int _logLevel;
79 protected long _timeout=30000;
80 protected long _interval=0;
81 protected long _maxInterval=10000;
82 protected boolean _initialized;
83 protected int _multiFrameInterval=-1;
84 private int _channelIdCacheLimit=0;
85
86 protected boolean _requestAvailable;
87
88 private ServletContext _context;
89 protected Random _random;
90 protected int _maxClientQueue=-1;
91
92 protected Extension[] _extensions;
93 protected JSON.Literal _transports=new JSON.Literal("[\"" + Bayeux.TRANSPORT_LONG_POLL + "\",\"" + Bayeux.TRANSPORT_CALLBACK_POLL + "\"]");
94
95 protected int _maxLazyLatency=5000;
96
97
98 protected AbstractBayeux()
99 {
100 _publishHandler=new PublishHandler();
101 _metaPublishHandler=new MetaPublishHandler();
102 _handlers.put(META_HANDSHAKE,new HandshakeHandler());
103 _handlers.put(META_CONNECT,new ConnectHandler());
104 _handlers.put(META_DISCONNECT,new DisconnectHandler());
105 _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
106 _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
107 _handlers.put(META_PING,new PingHandler());
108
109 setTimeout(getTimeout());
110 }
111
112
113 public void addExtension(Extension ext)
114 {
115 _extensions=(Extension[])LazyList.addToArray(_extensions, ext, Extension.class);
116 }
117
118 public void removeExtension(Extension ext)
119 {
120 _extensions = (Extension[])LazyList.removeFromArray(_extensions, ext);
121 }
122
123
124
125
126
127
128 public ChannelImpl getChannel(ChannelId id)
129 {
130 return _root.getChild(id);
131 }
132
133
134 public ChannelImpl getChannel(String id)
135 {
136 ChannelId cid=getChannelId(id);
137 if (cid.depth() == 0)
138 return null;
139 return _root.getChild(cid);
140 }
141
142
143 public Channel getChannel(String id, boolean create)
144 {
145 ChannelImpl channel=getChannel(id);
146
147 if (channel == null && create)
148 {
149 channel=new ChannelImpl(id,this);
150 Channel added =_root.addChild(channel);
151 if (added!=channel)
152 return added;
153 if (isLogInfo())
154 logInfo("newChannel: " + channel);
155 }
156 return channel;
157 }
158
159
160 public ChannelId getChannelId(String id)
161 {
162 if (_channelIdCacheLimit<0)
163 return new ChannelId(id);
164
165 ChannelId cid=_channelIdCache.get(id);
166 if (cid == null)
167 {
168 cid=new ChannelId(id);
169 if (_channelIdCacheLimit>0 && _channelIdCache.size()>_channelIdCacheLimit)
170 _channelIdCache.clear();
171 ChannelId other=_channelIdCache.putIfAbsent(id,cid);
172 if (other!=null)
173 return other;
174 }
175 return cid;
176 }
177
178
179 public Client getClient(String client_id)
180 {
181 if (client_id == null)
182 return null;
183 return _clients.get(client_id);
184 }
185
186
187 public Set<String> getClientIDs()
188 {
189 return _clients.keySet();
190 }
191
192
193
194
195
196
197 public long getMaxInterval()
198 {
199 return _maxInterval;
200 }
201
202
203
204
205
206 public int getLogLevel()
207 {
208 return _logLevel;
209 }
210
211
212 public SecurityPolicy getSecurityPolicy()
213 {
214 return _securityPolicy;
215 }
216
217
218 public long getTimeout()
219 {
220 return _timeout;
221 }
222
223
224 public long getInterval()
225 {
226 return _interval;
227 }
228
229
230
231
232
233
234
235 public boolean isDirectDeliver()
236 {
237 return false;
238 }
239
240
241
242
243
244
245
246
247
248 public void setDirectDeliver(boolean directDeliver)
249 {
250 _context.log("directDeliver is deprecated");
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264 public String handle(ClientImpl client, Transport transport, Message message) throws IOException
265 {
266 String channel_id = message.getChannel();
267 if (channel_id == null)
268 throw new IllegalArgumentException("Message without channel: "+message);
269
270 Handler handler = _handlers.get(channel_id);
271 if (handler != null)
272 {
273 message=extendRcvMeta(client,message);
274 handler.handle(client,transport,message);
275 _metaPublishHandler.handle(client, transport, message);
276 }
277 else if (channel_id.startsWith(META_SLASH))
278 {
279 message = extendRcvMeta(client, message);
280 _metaPublishHandler.handle(client, transport, message);
281 }
282 else
283 {
284
285 handler = _publishHandler;
286 message = extendRcv(client,message);
287 handler.handle(client, transport, message);
288 }
289
290 return channel_id;
291 }
292
293
294 public boolean hasChannel(String id)
295 {
296 ChannelId cid=getChannelId(id);
297 return _root.getChild(cid) != null;
298 }
299
300
301 public boolean isInitialized()
302 {
303 return _initialized;
304 }
305
306
307
308
309
310
311 public boolean isJSONCommented()
312 {
313 return false;
314 }
315
316
317 public boolean isLogDebug()
318 {
319 return _logLevel > 1;
320 }
321
322
323 public boolean isLogInfo()
324 {
325 return _logLevel > 0;
326 }
327
328
329 public void logDebug(String message)
330 {
331 if (_logLevel > 1)
332 _context.log(message);
333 }
334
335
336 public void logDebug(String message, Throwable th)
337 {
338 if (_logLevel > 1)
339 _context.log(message,th);
340 }
341
342
343 public void logWarn(String message, Throwable th)
344 {
345 _context.log(message + ": " + th.toString());
346 }
347
348
349 public void logWarn(String message)
350 {
351 _context.log(message);
352 }
353
354
355 public void logInfo(String message)
356 {
357 if (_logLevel > 0)
358 _context.log(message);
359 }
360
361
362 public Client newClient(String idPrefix)
363 {
364 ClientImpl client=new ClientImpl(this,idPrefix);
365 addClient(client,idPrefix);
366 return client;
367 }
368
369
370 public abstract ClientImpl newRemoteClient();
371
372
373
374
375
376
377
378
379
380
381
382 public Transport newTransport(ClientImpl client, Map<?,?> message)
383 {
384 if (isLogDebug())
385 logDebug("newTransport: client=" + client + ",message=" + message);
386
387 Transport result;
388
389 String type = client == null ? null : client.getConnectionType();
390 if (type == null)
391 {
392
393 type = (String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
394 }
395 if (type == null)
396 {
397
398 Object types = message.get(Bayeux.SUPPORTED_CONNECTION_TYPES_FIELD);
399 if (types != null)
400 {
401 List supportedTypes;
402 if (types instanceof Object[]) supportedTypes = Arrays.asList((Object[])types);
403 else if (types instanceof List) supportedTypes = (List)types;
404 else if (types instanceof Map) supportedTypes = new ArrayList(((Map)types).values());
405 else supportedTypes = Collections.emptyList();
406
407 if (supportedTypes.contains(Bayeux.TRANSPORT_LONG_POLL)) type = Bayeux.TRANSPORT_LONG_POLL;
408 else if (supportedTypes.contains(Bayeux.TRANSPORT_CALLBACK_POLL)) type = Bayeux.TRANSPORT_CALLBACK_POLL;
409 }
410 }
411 if (type == null)
412 {
413
414 String jsonp = (String) message.get(Bayeux.JSONP_PARAMETER);
415 type = jsonp != null ? Bayeux.TRANSPORT_CALLBACK_POLL : Bayeux.TRANSPORT_LONG_POLL;
416 }
417
418 if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type))
419 {
420 String jsonp = (String)message.get(Bayeux.JSONP_PARAMETER);
421 if (jsonp == null) throw new IllegalArgumentException("Missing 'jsonp' field in message " + message + " for transport " + type);
422 result = new JSONPTransport(jsonp);
423 }
424 else if (Bayeux.TRANSPORT_LONG_POLL.equals(type))
425 {
426 result = new JSONTransport();
427 }
428 else
429 {
430 throw new IllegalArgumentException("Unsupported transport type " + type);
431 }
432
433 if (isLogDebug())
434 logDebug("newTransport: result="+result);
435
436 return result;
437 }
438
439
440
441
442
443
444
445
446
447
448
449 protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy)
450 {
451 final MessageImpl message=newMessage();
452 message.put(CHANNEL_FIELD,to.toString());
453
454 if (msgId == null)
455 {
456 long id=message.hashCode() ^ (to == null?0:to.hashCode()) ^ (from == null?0:from.hashCode());
457 id=id < 0?-id:id;
458 message.put(ID_FIELD,Long.toString(id,36));
459 }
460 else
461 message.put(ID_FIELD,msgId);
462 message.put(DATA_FIELD,data);
463
464 message.setLazy(lazy);
465
466 final Message m=extendSendBayeux(from,message);
467
468 if (m != null)
469 _root.doDelivery(to,from,m);
470 if (m instanceof MessageImpl)
471 ((MessageImpl)m).decRef();
472 }
473
474
475 public boolean removeChannel(ChannelImpl channel)
476 {
477 return _root.doRemove(channel,_channelListeners);
478 }
479
480
481 protected void addChannel(ChannelImpl channel)
482 {
483 for (ChannelBayeuxListener l : _channelListeners)
484 l.channelAdded(channel);
485 }
486
487
488 protected String newClientId(long variation, String idPrefix)
489 {
490 if (idPrefix == null)
491 return Long.toString(getRandom(),36) + Long.toString(variation,36);
492 else
493 return idPrefix + "_" + Long.toString(getRandom(),36);
494 }
495
496
497 protected void addClient(ClientImpl client, String idPrefix)
498 {
499 while(true)
500 {
501 String id=newClientId(client.hashCode(),idPrefix);
502 client.setId(id);
503
504 ClientImpl other=_clients.putIfAbsent(id,client);
505 if (other == null)
506 {
507 for (ClientBayeuxListener l : _clientListeners)
508 l.clientAdded(client);
509 if (isLogInfo())
510 logInfo("Added client: " + client);
511 return;
512 }
513 }
514 }
515
516
517 public Client removeClient(String client_id)
518 {
519 ClientImpl client;
520 if (client_id == null)
521 return null;
522 client=_clients.remove(client_id);
523 if (client != null)
524 {
525 for (ClientBayeuxListener l : _clientListeners)
526 l.clientRemoved(client);
527 client.unsubscribeAll();
528 if (isLogInfo())
529 logInfo("Removed client: " + client);
530 }
531 return client;
532 }
533
534
535
536
537
538
539
540 public void setMaxInterval(long ms)
541 {
542 _maxInterval=ms;
543 }
544
545
546
547
548
549 public void setJSONCommented(boolean commented)
550 {
551 if (commented)
552 _context.log("JSONCommented is deprecated");
553 }
554
555
556
557
558
559
560 public void setLogLevel(int logLevel)
561 {
562 _logLevel=logLevel;
563 }
564
565
566 public void setSecurityPolicy(SecurityPolicy securityPolicy)
567 {
568 _securityPolicy=securityPolicy;
569 }
570
571
572 public void setTimeout(long ms)
573 {
574 _timeout=ms;
575 generateAdvice();
576 }
577
578
579 public void setInterval(long ms)
580 {
581 _interval=ms;
582 generateAdvice();
583 }
584
585
586
587
588
589
590
591
592
593
594 public void setMultiFrameInterval(int multiFrameInterval)
595 {
596 _multiFrameInterval=multiFrameInterval;
597 generateAdvice();
598 }
599
600
601
602
603
604 public int getMultiFrameInterval()
605 {
606 return _multiFrameInterval;
607 }
608
609
610
611
612
613 public int getChannelIdCacheLimit()
614 {
615 return _channelIdCacheLimit;
616 }
617
618
619
620
621
622
623
624
625 public void setChannelIdCacheLimit(int channelIdCacheLimit)
626 {
627 this._channelIdCacheLimit = channelIdCacheLimit;
628 }
629
630
631 void generateAdvice()
632 {
633 setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":" + getInterval() + ",\"timeout\":" + getTimeout() + "}"));
634 }
635
636
637 public void setAdvice(JSON.Literal advice)
638 {
639 synchronized(this)
640 {
641 _adviceVersion++;
642 _advice=advice;
643 _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
644 }
645 }
646
647
648 private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
649 {
650 Map<String,Object> a=(Map<String,Object>)JSON.parse(_advice.toString());
651 a.put("multiple-clients",Boolean.TRUE);
652 if (_multiFrameInterval > 0)
653 {
654 a.put("reconnect","retry");
655 a.put("interval",_multiFrameInterval);
656 }
657 else
658 a.put("reconnect","none");
659 return a;
660 }
661
662
663 public JSON.Literal getAdvice()
664 {
665 return _advice;
666 }
667
668
669
670
671
672
673 public boolean isRequestAvailable()
674 {
675 return _requestAvailable;
676 }
677
678
679
680
681
682
683
684 public void setRequestAvailable(boolean requestAvailable)
685 {
686 _requestAvailable=requestAvailable;
687 }
688
689
690
691
692
693
694 public HttpServletRequest getCurrentRequest()
695 {
696 return _request.get();
697 }
698
699
700 void setCurrentRequest(HttpServletRequest request)
701 {
702 _request.set(request);
703 }
704
705
706 public Collection<Channel> getChannels()
707 {
708 List<Channel> channels=new ArrayList<Channel>();
709 _root.getChannels(channels);
710 return channels;
711 }
712
713
714
715
716
717 public int getChannelCount()
718 {
719 return getChannels().size();
720 }
721
722
723 public Collection<Client> getClients()
724 {
725 return new ArrayList<Client>(_clients.values());
726 }
727
728
729
730
731
732 public int getClientCount()
733 {
734 return _clients.size();
735 }
736
737
738 public boolean hasClient(String clientId)
739 {
740 if (clientId == null)
741 return false;
742 return _clients.containsKey(clientId);
743 }
744
745
746 public Channel removeChannel(String channelId)
747 {
748 Channel channel=getChannel(channelId);
749
750 boolean removed=false;
751 if (channel != null)
752 removed=channel.remove();
753
754 if (removed)
755 return channel;
756 else
757 return null;
758 }
759
760
761 protected void initialize(ServletContext context)
762 {
763 synchronized(this)
764 {
765 _initialized=true;
766 _context=context;
767 try
768 {
769 _random=SecureRandom.getInstance("SHA1PRNG");
770 }
771 catch(Exception e)
772 {
773 context.log("Could not get secure random for ID generation",e);
774 _random=new Random();
775 }
776 _random.setSeed(_random.nextLong() ^ hashCode() ^ System.nanoTime() ^ Runtime.getRuntime().freeMemory());
777
778 _root.addChild(new ServiceChannel(Bayeux.SERVICE));
779
780 }
781 }
782
783
784 long getRandom()
785 {
786 long l=_random.nextLong();
787 return l < 0?-l:l;
788 }
789
790
791 void clientOnBrowser(String browserId, String clientId)
792 {
793 List<String> clients=_browser2client.get(browserId);
794 if (clients == null)
795 {
796 List<String> new_clients=new CopyOnWriteArrayList<String>();
797 clients=_browser2client.putIfAbsent(browserId,new_clients);
798 if (clients == null)
799 clients=new_clients;
800 }
801 clients.add(clientId);
802 }
803
804
805 void clientOffBrowser(String browserId, String clientId)
806 {
807 List<String> clients=_browser2client.get(browserId);
808 if (clients != null)
809 {
810 clients.remove(clientId);
811 if (clients.isEmpty())
812 _browser2client.remove(browserId);
813 }
814 }
815
816
817 List<String> clientsOnBrowser(String browserId)
818 {
819 return _browser2client.get(browserId);
820 }
821
822
823 public void addListener(BayeuxListener listener)
824 {
825 if (listener instanceof ClientBayeuxListener)
826 _clientListeners.add((ClientBayeuxListener)listener);
827 if (listener instanceof ChannelBayeuxListener)
828 _channelListeners.add((ChannelBayeuxListener)listener);
829 }
830
831 public void removeListener(BayeuxListener listener)
832 {
833 if (listener instanceof ClientBayeuxListener)
834 _clientListeners.remove(listener);
835 if (listener instanceof ChannelBayeuxListener)
836 _channelListeners.remove(listener);
837 }
838
839
840 public int getMaxClientQueue()
841 {
842 return _maxClientQueue;
843 }
844
845
846 public void setMaxClientQueue(int size)
847 {
848 _maxClientQueue=size;
849 }
850
851
852 protected Message extendRcv(ClientImpl from, Message message)
853 {
854 if (_extensions != null)
855 {
856 for (int i=_extensions.length; message != null && i-- > 0;)
857 message=_extensions[i].rcv(from,message);
858 }
859
860 if (from != null)
861 {
862 Extension[] client_exs=from.getExtensions();
863 if (client_exs != null)
864 {
865 for (int i=client_exs.length; message != null && i-- > 0;)
866 message=client_exs[i].rcv(from,message);
867 }
868 }
869
870 return message;
871 }
872
873
874 protected Message extendRcvMeta(ClientImpl from, Message message)
875 {
876 if (_extensions != null)
877 {
878 for (int i=_extensions.length; message != null && i-- > 0;)
879 message=_extensions[i].rcvMeta(from,message);
880 }
881
882 if (from != null)
883 {
884 Extension[] client_exs=from.getExtensions();
885 if (client_exs != null)
886 {
887 for (int i=client_exs.length; message != null && i-- > 0;)
888 message=client_exs[i].rcvMeta(from,message);
889 }
890 }
891 return message;
892 }
893
894
895 protected Message extendSendBayeux(Client from, Message message)
896 {
897 if (_extensions != null)
898 {
899 for (int i=0; message != null && i < _extensions.length; i++)
900 {
901 message=_extensions[i].send(from,message);
902 }
903 }
904
905 return message;
906 }
907
908
909 public Message extendSendClient(Client from, ClientImpl to, Message message)
910 {
911 if (to != null)
912 {
913 Extension[] client_exs=to.getExtensions();
914 if (client_exs != null)
915 {
916 for (int i=0; message != null && i < client_exs.length; i++)
917 message=client_exs[i].send(from,message);
918 }
919 }
920
921 return message;
922 }
923
924
925 public Message extendSendMeta(ClientImpl from, Message message)
926 {
927 if (_extensions != null)
928 {
929 for (int i=0; message != null && i < _extensions.length; i++)
930 message=_extensions[i].sendMeta(from,message);
931 }
932
933 if (from != null)
934 {
935 Extension[] client_exs=from.getExtensions();
936 if (client_exs != null)
937 {
938 for (int i=0; message != null && i < client_exs.length; i++)
939 message=client_exs[i].sendMeta(from,message);
940 }
941 }
942
943 return message;
944 }
945
946
947
948
949
950
951 public int getMaxLazyLatency()
952 {
953 return _maxLazyLatency;
954 }
955
956
957
958
959
960
961 public void setMaxLazyLatency(int ms)
962 {
963 _maxLazyLatency = ms;
964 }
965
966
967
968 public static class DefaultPolicy implements SecurityPolicy
969 {
970 public boolean canHandshake(Message message)
971 {
972 return true;
973 }
974
975 public boolean canCreate(Client client, String channel, Message message)
976 {
977 return client != null && !channel.startsWith(Bayeux.META_SLASH);
978 }
979
980 public boolean canSubscribe(Client client, String channel, Message message)
981 {
982 if (client != null && ("/**".equals(channel) || "/*".equals(channel)))
983 return false;
984 return client != null && !channel.startsWith(Bayeux.META_SLASH);
985 }
986
987 public boolean canPublish(Client client, String channel, Message message)
988 {
989 return client != null || Bayeux.META_HANDSHAKE.equals(channel);
990 }
991
992 }
993
994
995
996 protected abstract class Handler
997 {
998 abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
999
1000 abstract ChannelId getMetaChannelId();
1001
1002 protected boolean isClientUnknown(Client client)
1003 {
1004 return client == null || !hasClient(client.getId());
1005 }
1006
1007 void unknownClient(Transport transport, String channel) throws IOException
1008 {
1009 MessageImpl reply=newMessage();
1010
1011 reply.put(CHANNEL_FIELD,channel);
1012 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1013 reply.put(ERROR_FIELD,"402::Unknown client");
1014 reply.put("advice",_handshakeAdvice);
1015 transport.send(reply);
1016 reply.decRef();
1017 }
1018
1019 void sendMetaReply(final ClientImpl client, Message reply, final Transport transport) throws IOException
1020 {
1021 reply=extendSendMeta(client,reply);
1022 if (reply != null)
1023 {
1024 transport.send(reply);
1025 if (reply instanceof MessageImpl)
1026 ((MessageImpl)reply).decRef();
1027 }
1028 }
1029 }
1030
1031
1032
1033 protected class ConnectHandler extends Handler
1034 {
1035 protected String _metaChannel=META_CONNECT;
1036
1037 @Override
1038 ChannelId getMetaChannelId()
1039 {
1040 return META_CONNECT_ID;
1041 }
1042
1043 @Override
1044 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1045 {
1046 if (isClientUnknown(client))
1047 {
1048 unknownClient(transport,_metaChannel);
1049 return;
1050 }
1051
1052
1053 String type=client.getConnectionType();
1054 boolean polling=true;
1055 if (type == null)
1056 {
1057 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
1058 client.setConnectionType(type);
1059 polling=false;
1060 }
1061
1062 Object advice=message.get(ADVICE_FIELD);
1063 if (advice != null)
1064 {
1065 Long timeout=(Long)((Map)advice).get("timeout");
1066 if (timeout != null && timeout.longValue() >= 0L)
1067 client.setTimeout(timeout.longValue());
1068 else
1069 client.setTimeout(-1);
1070
1071 Long interval=(Long)((Map)advice).get("interval");
1072 if (interval != null && interval.longValue() >= 0L)
1073 client.setInterval(interval.longValue());
1074 else
1075 client.setInterval(-1);
1076 }
1077 else
1078 {
1079 client.setTimeout(-1);
1080 client.setInterval(-1);
1081 }
1082
1083 advice=null;
1084
1085
1086 if (polling && _multiFrameInterval > 0 && client.getBrowserId() != null)
1087 {
1088 List<String> clients=clientsOnBrowser(client.getBrowserId());
1089 if (clients != null && clients.size() > 1)
1090 {
1091 polling=clients.get(0).equals(client.getId());
1092 advice=client.getAdvice();
1093 if (advice == null)
1094 advice=_multiFrameAdvice;
1095 else
1096
1097 advice=multiFrameAdvice((JSON.Literal)advice);
1098 }
1099 }
1100
1101 synchronized(this)
1102 {
1103 if (advice == null)
1104 {
1105 if (_adviceVersion != client._adviseVersion)
1106 {
1107 advice=_advice;
1108 client._adviseVersion=_adviceVersion;
1109 }
1110 }
1111 else
1112 client._adviseVersion=-1;
1113 }
1114
1115
1116 String id=message.getId();
1117
1118 Message reply=newMessage(message);
1119
1120 reply.put(CHANNEL_FIELD,META_CONNECT);
1121 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1122 if (advice != null)
1123 reply.put(ADVICE_FIELD,advice);
1124 if (id != null)
1125 reply.put(ID_FIELD,id);
1126
1127 if (polling)
1128 transport.setMetaConnectReply(reply);
1129 else
1130 sendMetaReply(client,reply,transport);
1131 }
1132 }
1133
1134
1135
1136 protected class DisconnectHandler extends Handler
1137 {
1138 @Override
1139 ChannelId getMetaChannelId()
1140 {
1141 return META_DISCONNECT_ID;
1142 }
1143
1144 @Override
1145 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1146 {
1147 if (isClientUnknown(client))
1148 {
1149 unknownClient(transport,META_DISCONNECT);
1150 return;
1151 }
1152 if (isLogInfo())
1153 logInfo("Disconnect " + client.getId());
1154
1155 client.remove(false);
1156
1157 Message reply=newMessage(message);
1158 reply.put(CHANNEL_FIELD,META_DISCONNECT);
1159 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1160 String id=message.getId();
1161 if (id != null)
1162 reply.put(ID_FIELD,id);
1163
1164 Message pollReply=transport.getMetaConnectReply();
1165 if (pollReply != null)
1166 {
1167 transport.setMetaConnectReply(null);
1168 sendMetaReply(client,pollReply,transport);
1169 }
1170 sendMetaReply(client,reply,transport);
1171 }
1172 }
1173
1174
1175
1176 protected class HandshakeHandler extends Handler
1177 {
1178 @Override
1179 ChannelId getMetaChannelId()
1180 {
1181 return META_HANDSHAKE_ID;
1182 }
1183
1184 @Override
1185 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1186 {
1187 if (client != null)
1188 throw new IllegalStateException();
1189
1190 if (_securityPolicy != null && !_securityPolicy.canHandshake(message))
1191 {
1192 Message reply=newMessage(message);
1193 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1194 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1195 reply.put(ERROR_FIELD,"403::Handshake denied");
1196
1197 sendMetaReply(client,reply,transport);
1198 return;
1199 }
1200
1201 client=newRemoteClient();
1202
1203 Message reply=newMessage(message);
1204 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1205 reply.put(VERSION_FIELD,"1.0");
1206 reply.put(MIN_VERSION_FIELD,"0.9");
1207
1208 if (client != null)
1209 {
1210 reply.put(SUPPORTED_CONNECTION_TYPES_FIELD,_transports);
1211 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1212 reply.put(CLIENT_FIELD,client.getId());
1213 if (_advice != null)
1214 reply.put(ADVICE_FIELD,_advice);
1215 }
1216 else
1217 {
1218 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1219 if (_advice != null)
1220 reply.put(ADVICE_FIELD,_advice);
1221 }
1222
1223 if (isLogDebug())
1224 logDebug("handshake.handle: reply=" + reply);
1225
1226 String id=message.getId();
1227 if (id != null)
1228 reply.put(ID_FIELD,id);
1229
1230 sendMetaReply(client,reply,transport);
1231 }
1232 }
1233
1234
1235
1236 protected class PublishHandler extends Handler
1237 {
1238 @Override
1239 ChannelId getMetaChannelId()
1240 {
1241 return null;
1242 }
1243
1244 @Override
1245 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1246 {
1247 if (message == null)
1248 {
1249 Message reply = newMessage(message);
1250 reply.put(SUCCESSFUL_FIELD, Boolean.FALSE);
1251 reply.put(ERROR_FIELD, "404::Message deleted");
1252 sendMetaReply(client, reply, transport);
1253 return;
1254 }
1255
1256 String channel_id=message.getChannel();
1257 if (isClientUnknown(client) && message.containsKey(CLIENT_FIELD))
1258 {
1259 unknownClient(transport,channel_id);
1260 return;
1261 }
1262
1263 String id=message.getId();
1264
1265 ChannelId cid=getChannelId(channel_id);
1266 Object data=message.get(Bayeux.DATA_FIELD);
1267
1268 Message reply=newMessage(message);
1269 reply.put(CHANNEL_FIELD,channel_id);
1270 if (id != null)
1271 reply.put(ID_FIELD,id);
1272
1273 if (data == null)
1274 {
1275 message=null;
1276 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1277 reply.put(ERROR_FIELD,"403::No data");
1278 }
1279 else if (!_securityPolicy.canPublish(client,channel_id,message))
1280 {
1281 message=null;
1282 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1283 reply.put(ERROR_FIELD,"403::Publish denied");
1284 }
1285 else
1286 {
1287 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1288 }
1289
1290 sendMetaReply(client,reply,transport);
1291
1292 if (message != null)
1293 _root.doDelivery(cid,client,message);
1294 }
1295 }
1296
1297
1298
1299 protected class MetaPublishHandler extends Handler
1300 {
1301 @Override
1302 ChannelId getMetaChannelId()
1303 {
1304 return null;
1305 }
1306
1307 @Override
1308 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1309 {
1310 String channel_id=message.getChannel();
1311
1312 if (isClientUnknown(client) && !META_HANDSHAKE.equals(channel_id) && !META_DISCONNECT.equals(channel_id))
1313 {
1314
1315 return;
1316 }
1317
1318 if (_securityPolicy.canPublish(client,channel_id,message))
1319 {
1320 _root.doDelivery(getChannelId(channel_id),client,message);
1321 }
1322 }
1323 }
1324
1325
1326
1327 protected class SubscribeHandler extends Handler
1328 {
1329 @Override
1330 ChannelId getMetaChannelId()
1331 {
1332 return META_SUBSCRIBE_ID;
1333 }
1334
1335 @Override
1336 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1337 {
1338 if (isClientUnknown(client))
1339 {
1340 unknownClient(transport,META_SUBSCRIBE);
1341 return;
1342 }
1343
1344 String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1345
1346
1347 if (subscribe_id == null)
1348 {
1349 subscribe_id=Long.toString(getRandom(),36);
1350 while(getChannel(subscribe_id) != null)
1351 subscribe_id=Long.toString(getRandom(),36);
1352 }
1353
1354 ChannelId cid=null;
1355 boolean can_subscribe=false;
1356
1357 if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1358 {
1359 can_subscribe=true;
1360 }
1361 else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1362 {
1363 can_subscribe=false;
1364 }
1365 else
1366 {
1367 cid=getChannelId(subscribe_id);
1368 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1369 }
1370
1371 Message reply=newMessage(message);
1372 reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1373 reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1374
1375 if (can_subscribe)
1376 {
1377 if (cid != null)
1378 {
1379 ChannelImpl channel=getChannel(cid);
1380 if (channel == null && _securityPolicy.canCreate(client,subscribe_id,message))
1381 channel=(ChannelImpl)getChannel(subscribe_id,true);
1382
1383 if (channel != null)
1384 {
1385
1386
1387
1388
1389 if (isClientUnknown(client))
1390 {
1391 unknownClient(transport, META_SUBSCRIBE);
1392 return;
1393 }
1394 else
1395 {
1396 channel.subscribe(client);
1397 }
1398 }
1399 else
1400 {
1401 can_subscribe=false;
1402 }
1403 }
1404
1405 if (can_subscribe)
1406 {
1407 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1408 }
1409 else
1410 {
1411 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1412 reply.put(ERROR_FIELD,"403::cannot create");
1413 }
1414 }
1415 else
1416 {
1417 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1418 reply.put(ERROR_FIELD,"403::cannot subscribe");
1419
1420 }
1421
1422 String id=message.getId();
1423 if (id != null)
1424 reply.put(ID_FIELD,id);
1425
1426 sendMetaReply(client,reply,transport);
1427 }
1428 }
1429
1430
1431
1432 protected class UnsubscribeHandler extends Handler
1433 {
1434 @Override
1435 ChannelId getMetaChannelId()
1436 {
1437 return META_UNSUBSCRIBE_ID;
1438 }
1439
1440 @Override
1441 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1442 {
1443 if (isClientUnknown(client))
1444 {
1445 unknownClient(transport,META_UNSUBSCRIBE);
1446 return;
1447 }
1448
1449 String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1450 ChannelImpl channel=getChannel(channel_id);
1451 if (channel != null)
1452 channel.unsubscribe(client);
1453
1454 Message reply=newMessage(message);
1455 reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1456 reply.put(SUBSCRIPTION_FIELD,channel_id);
1457 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1458
1459 String id=message.getId();
1460 if (id != null)
1461 reply.put(ID_FIELD,id);
1462
1463 sendMetaReply(client,reply,transport);
1464 }
1465 }
1466
1467
1468
1469 protected class PingHandler extends Handler
1470 {
1471 @Override
1472 ChannelId getMetaChannelId()
1473 {
1474 return META_PING_ID;
1475 }
1476
1477 @Override
1478 public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1479 {
1480 Message reply=newMessage(message);
1481 reply.put(CHANNEL_FIELD,META_PING);
1482 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1483
1484 String id=message.getId();
1485 if (id != null)
1486 reply.put(ID_FIELD,id);
1487
1488 sendMetaReply(client,reply,transport);
1489 }
1490 }
1491
1492
1493
1494 protected class ServiceChannel extends ChannelImpl
1495 {
1496 ServiceChannel(String id)
1497 {
1498 super(id,AbstractBayeux.this);
1499 setPersistent(true);
1500 }
1501
1502
1503 @Override
1504 public ChannelImpl addChild(ChannelImpl channel)
1505 {
1506 channel.setPersistent(true);
1507 return super.addChild(channel);
1508 }
1509
1510
1511 @Override
1512 public void subscribe(Client client)
1513 {
1514 if (client.isLocal())
1515 super.subscribe(client);
1516 }
1517
1518 }
1519 }