View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.io.IOException;
18  import java.security.SecureRandom;
19  import java.util.ArrayList;
20  import java.util.Arrays;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import javax.servlet.ServletContext;
31  import javax.servlet.http.HttpServletRequest;
32  
33  import org.cometd.Bayeux;
34  import org.cometd.BayeuxListener;
35  import org.cometd.Channel;
36  import org.cometd.ChannelBayeuxListener;
37  import org.cometd.Client;
38  import org.cometd.ClientBayeuxListener;
39  import org.cometd.Extension;
40  import org.cometd.Message;
41  import org.cometd.SecurityPolicy;
42  import org.mortbay.util.LazyList;
43  import org.mortbay.util.ajax.JSON;
44  
45  /* ------------------------------------------------------------ */
46  /**
47   * @author gregw
48   * @author aabeling: added JSONP transport
49   */
50  public abstract class AbstractBayeux extends MessagePool implements Bayeux
51  {
52      public static final ChannelId META_ID=new ChannelId(META);
53      public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
54      public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
55      public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
56      public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
57      public static final ChannelId META_PING_ID=new ChannelId(META_PING);
58      public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
59      public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
60      public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
61  
62      private final HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
63      private final ChannelImpl _root=new ChannelImpl("/",this);
64      private final ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
65      protected final ConcurrentHashMap<String,ChannelId> _channelIdCache=new ConcurrentHashMap<String,ChannelId>();
66      protected final ConcurrentHashMap<String,List<String>> _browser2client=new ConcurrentHashMap<String,List<String>>();
67      protected final ThreadLocal<HttpServletRequest> _request=new ThreadLocal<HttpServletRequest>();
68      protected final List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
69      protected final List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
70      protected final Handler _publishHandler;
71      protected final Handler _metaPublishHandler;
72  
73      protected SecurityPolicy _securityPolicy=new DefaultPolicy();
74      protected JSON.Literal _advice;
75      protected JSON.Literal _multiFrameAdvice;
76      protected int _adviceVersion=0;
77      protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
78      protected int _logLevel;
79      protected long _timeout=30000;
80      protected long _interval=0;
81      protected long _maxInterval=10000;
82      protected boolean _initialized;
83      protected int _multiFrameInterval=-1;
84      private int _channelIdCacheLimit=0;
85  
86      protected boolean _requestAvailable;
87  
88      private ServletContext _context;
89      protected Random _random;
90      protected int _maxClientQueue=-1;
91  
92      protected Extension[] _extensions;
93      protected JSON.Literal _transports=new JSON.Literal("[\"" + Bayeux.TRANSPORT_LONG_POLL + "\",\"" + Bayeux.TRANSPORT_CALLBACK_POLL + "\"]");
94  
95      protected int _maxLazyLatency=5000;
96  
97      /* ------------------------------------------------------------ */
98      protected AbstractBayeux()
99      {
100         _publishHandler=new PublishHandler();
101         _metaPublishHandler=new MetaPublishHandler();
102         _handlers.put(META_HANDSHAKE,new HandshakeHandler());
103         _handlers.put(META_CONNECT,new ConnectHandler());
104         _handlers.put(META_DISCONNECT,new DisconnectHandler());
105         _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
106         _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
107         _handlers.put(META_PING,new PingHandler());
108 
109         setTimeout(getTimeout());
110     }
111 
112     /* ------------------------------------------------------------ */
113     public void addExtension(Extension ext)
114     {
115         _extensions=(Extension[])LazyList.addToArray(_extensions, ext, Extension.class);
116     }
117 
118     public void removeExtension(Extension ext)
119     {
120         _extensions = (Extension[])LazyList.removeFromArray(_extensions, ext);
121     }
122 
123     /* ------------------------------------------------------------ */
124     /**
125      * @param id the channel id
126      * @return the ChannelImpl instance with the given id
127      */
128     public ChannelImpl getChannel(ChannelId id)
129     {
130         return _root.getChild(id);
131     }
132 
133     /* ------------------------------------------------------------ */
134     public ChannelImpl getChannel(String id)
135     {
136         ChannelId cid=getChannelId(id);
137         if (cid.depth() == 0)
138             return null;
139         return _root.getChild(cid);
140     }
141 
142     /* ------------------------------------------------------------ */
143     public Channel getChannel(String id, boolean create)
144     {
145         ChannelImpl channel=getChannel(id);
146 
147         if (channel == null && create)
148         {
149             channel=new ChannelImpl(id,this);
150             Channel added =_root.addChild(channel);
151             if (added!=channel)
152                 return added;
153             if (isLogInfo())
154                 logInfo("newChannel: " + channel);
155         }
156         return channel;
157     }
158 
159     /* ------------------------------------------------------------ */
160     public ChannelId getChannelId(String id)
161     {
162         if (_channelIdCacheLimit<0)
163             return new ChannelId(id);
164 
165         ChannelId cid=_channelIdCache.get(id);
166         if (cid == null)
167         {
168             cid=new ChannelId(id);
169             if (_channelIdCacheLimit>0 && _channelIdCache.size()>_channelIdCacheLimit)
170                 _channelIdCache.clear();
171             ChannelId other=_channelIdCache.putIfAbsent(id,cid);
172             if (other!=null)
173                 return other;
174         }
175         return cid;
176     }
177 
178     /* ------------------------------------------------------------ */
179     public Client getClient(String client_id)
180     {
181         if (client_id == null)
182             return null;
183         return _clients.get(client_id);
184     }
185 
186     /* ------------------------------------------------------------ */
187     public Set<String> getClientIDs()
188     {
189         return _clients.keySet();
190     }
191 
192     /* ------------------------------------------------------------ */
193     /**
194      * @return The maximum time in ms to wait between polls before timing out a
195      *         client
196      */
197     public long getMaxInterval()
198     {
199         return _maxInterval;
200     }
201 
202     /* ------------------------------------------------------------ */
203     /**
204      * @return the logLevel. 0=none, 1=info, 2=debug
205      */
206     public int getLogLevel()
207     {
208         return _logLevel;
209     }
210 
211     /* ------------------------------------------------------------ */
212     public SecurityPolicy getSecurityPolicy()
213     {
214         return _securityPolicy;
215     }
216 
217     /* ------------------------------------------------------------ */
218     public long getTimeout()
219     {
220         return _timeout;
221     }
222 
223     /* ------------------------------------------------------------ */
224     public long getInterval()
225     {
226         return _interval;
227     }
228 
229     /* ------------------------------------------------------------ */
230     /**
231      * @return true if published messages are directly delivered to subscribers.
232      *         False if a new message is to be created that holds only supported
233      *         fields.
234      */
235     public boolean isDirectDeliver()
236     {
237         return false;
238     }
239 
240     /* ------------------------------------------------------------ */
241     /**
242      * @deprecated
243      * @param directDeliver
244      *            true if published messages are directly delivered to
245      *            subscribers. False if a new message is to be created that
246      *            holds only supported fields.
247      */
248     public void setDirectDeliver(boolean directDeliver)
249     {
250         _context.log("directDeliver is deprecated");
251     }
252 
253     /* ------------------------------------------------------------ */
254     /**
255      * Handle a Bayeux message. This is normally only called by the bayeux
256      * servlet or a test harness.
257      *
258      * @param client The client if known
259      * @param transport The transport to use for the message
260      * @param message The bayeux message.
261      * @return the channel id
262      * @throws IOException if the handle fails
263      */
264     public String handle(ClientImpl client, Transport transport, Message message) throws IOException
265     {
266         String channel_id = message.getChannel();
267         if (channel_id == null)
268             throw new IllegalArgumentException("Message without channel: "+message);
269 
270         Handler handler = _handlers.get(channel_id);
271         if (handler != null)
272         {
273             message=extendRcvMeta(client,message);
274             handler.handle(client,transport,message);
275             _metaPublishHandler.handle(client, transport, message);
276         }
277         else if (channel_id.startsWith(META_SLASH))
278         {
279             message = extendRcvMeta(client, message);
280             _metaPublishHandler.handle(client, transport, message);
281         }
282         else
283         {
284             // Non meta channel
285             handler = _publishHandler;
286             message = extendRcv(client,message);
287             handler.handle(client, transport, message);
288         }
289 
290         return channel_id;
291     }
292 
293     /* ------------------------------------------------------------ */
294     public boolean hasChannel(String id)
295     {
296         ChannelId cid=getChannelId(id);
297         return _root.getChild(cid) != null;
298     }
299 
300     /* ------------------------------------------------------------ */
301     public boolean isInitialized()
302     {
303         return _initialized;
304     }
305 
306     /* ------------------------------------------------------------ */
307     /**
308      * @return the commented
309      * @deprecated
310      */
311     public boolean isJSONCommented()
312     {
313         return false;
314     }
315 
316     /* ------------------------------------------------------------ */
317     public boolean isLogDebug()
318     {
319         return _logLevel > 1;
320     }
321 
322     /* ------------------------------------------------------------ */
323     public boolean isLogInfo()
324     {
325         return _logLevel > 0;
326     }
327 
328     /* ------------------------------------------------------------ */
329     public void logDebug(String message)
330     {
331         if (_logLevel > 1)
332             _context.log(message);
333     }
334 
335     /* ------------------------------------------------------------ */
336     public void logDebug(String message, Throwable th)
337     {
338         if (_logLevel > 1)
339             _context.log(message,th);
340     }
341 
342     /* ------------------------------------------------------------ */
343     public void logWarn(String message, Throwable th)
344     {
345         _context.log(message + ": " + th.toString());
346     }
347 
348     /* ------------------------------------------------------------ */
349     public void logWarn(String message)
350     {
351         _context.log(message);
352     }
353 
354     /* ------------------------------------------------------------ */
355     public void logInfo(String message)
356     {
357         if (_logLevel > 0)
358             _context.log(message);
359     }
360 
361     /* ------------------------------------------------------------ */
362     public Client newClient(String idPrefix)
363     {
364         ClientImpl client=new ClientImpl(this,idPrefix);
365         addClient(client,idPrefix);
366         return client;
367     }
368 
369     /* ------------------------------------------------------------ */
370     public abstract ClientImpl newRemoteClient();
371 
372     /* ------------------------------------------------------------ */
373     /**
374      * Create new transport object for a bayeux message
375      *
376      * @param client
377      *            The client
378      * @param message
379      *            the bayeux message
380      * @return the negotiated transport.
381      */
382     public Transport newTransport(ClientImpl client, Map<?,?> message)
383     {
384         if (isLogDebug())
385             logDebug("newTransport: client=" + client + ",message=" + message);
386 
387         Transport result;
388 
389         String type = client == null ? null : client.getConnectionType();
390         if (type == null)
391         {
392             // Check if it is a connect message and we can extract the connection type
393             type = (String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
394         }
395         if (type == null)
396         {
397             // Check if it is an handshake message and we can negotiate the connection type
398             Object types = message.get(Bayeux.SUPPORTED_CONNECTION_TYPES_FIELD);
399             if (types != null)
400             {
401                 List supportedTypes;
402                 if (types instanceof Object[]) supportedTypes = Arrays.asList((Object[])types);
403                 else if (types instanceof List) supportedTypes = (List)types;
404                 else if (types instanceof Map) supportedTypes = new ArrayList(((Map)types).values());
405                 else supportedTypes = Collections.emptyList();
406 
407                 if (supportedTypes.contains(Bayeux.TRANSPORT_LONG_POLL)) type = Bayeux.TRANSPORT_LONG_POLL;
408                 else if (supportedTypes.contains(Bayeux.TRANSPORT_CALLBACK_POLL)) type = Bayeux.TRANSPORT_CALLBACK_POLL;
409             }
410         }
411         if (type == null)
412         {
413             // A normal message, check if it has the jsonp parameter
414             String jsonp = (String) message.get(Bayeux.JSONP_PARAMETER);
415             type = jsonp != null ? Bayeux.TRANSPORT_CALLBACK_POLL : Bayeux.TRANSPORT_LONG_POLL;
416         }
417 
418         if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type))
419         {
420             String jsonp = (String)message.get(Bayeux.JSONP_PARAMETER);
421             if (jsonp == null) throw new IllegalArgumentException("Missing 'jsonp' field in message " + message + " for transport " + type);
422             result = new JSONPTransport(jsonp);
423         }
424         else if (Bayeux.TRANSPORT_LONG_POLL.equals(type))
425         {
426             result = new JSONTransport();
427         }
428         else
429         {
430             throw new IllegalArgumentException("Unsupported transport type " + type);
431         }
432 
433         if (isLogDebug())
434             logDebug("newTransport: result="+result);
435 
436         return result;
437     }
438 
439     /* ------------------------------------------------------------ */
440     /**
441      * Publish data to a channel. Creates a message and delivers it to the root channel.
442      *
443      * @param to the channel id to publish to
444      * @param from the client that publishes
445      * @param data the data to publish
446      * @param msgId the message id
447      * @param lazy whether the message is published lazily
448      */
449     protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy)
450     {
451         final MessageImpl message=newMessage();
452         message.put(CHANNEL_FIELD,to.toString());
453 
454         if (msgId == null)
455         {
456             long id=message.hashCode() ^ (to == null?0:to.hashCode()) ^ (from == null?0:from.hashCode());
457             id=id < 0?-id:id;
458             message.put(ID_FIELD,Long.toString(id,36));
459         }
460         else
461             message.put(ID_FIELD,msgId);
462         message.put(DATA_FIELD,data);
463 
464         message.setLazy(lazy);
465 
466         final Message m=extendSendBayeux(from,message);
467 
468         if (m != null)
469             _root.doDelivery(to,from,m);
470         if (m instanceof MessageImpl)
471             ((MessageImpl)m).decRef();
472     }
473 
474     /* ------------------------------------------------------------ */
475     public boolean removeChannel(ChannelImpl channel)
476     {
477         return _root.doRemove(channel,_channelListeners);
478     }
479 
480     /* ------------------------------------------------------------ */
481     protected void addChannel(ChannelImpl channel)
482     {
483         for (ChannelBayeuxListener l : _channelListeners)
484             l.channelAdded(channel);
485     }
486 
487     /* ------------------------------------------------------------ */
488     protected String newClientId(long variation, String idPrefix)
489     {
490         if (idPrefix == null)
491             return Long.toString(getRandom(),36) + Long.toString(variation,36);
492         else
493             return idPrefix + "_" + Long.toString(getRandom(),36);
494     }
495 
496     /* ------------------------------------------------------------ */
497     protected void addClient(ClientImpl client, String idPrefix)
498     {
499         while(true)
500         {
501             String id=newClientId(client.hashCode(),idPrefix);
502             client.setId(id);
503 
504             ClientImpl other=_clients.putIfAbsent(id,client);
505             if (other == null)
506             {
507                 for (ClientBayeuxListener l : _clientListeners)
508                     l.clientAdded(client);
509                 if (isLogInfo())
510                     logInfo("Added client: " + client);
511                 return;
512             }
513         }
514     }
515 
516     /* ------------------------------------------------------------ */
517     public Client removeClient(String client_id)
518     {
519         ClientImpl client;
520         if (client_id == null)
521             return null;
522         client=_clients.remove(client_id);
523         if (client != null)
524         {
525             for (ClientBayeuxListener l : _clientListeners)
526                 l.clientRemoved(client);
527             client.unsubscribeAll();
528             if (isLogInfo())
529                 logInfo("Removed client: " + client);
530         }
531         return client;
532     }
533 
534     /* ------------------------------------------------------------ */
535     /**
536      * @param ms
537      *            The maximum time in ms to wait between polls before timing out
538      *            a client
539      */
540     public void setMaxInterval(long ms)
541     {
542         _maxInterval=ms;
543     }
544 
545     /* ------------------------------------------------------------ */
546     /**
547      * @param commented the commented to set
548      */
549     public void setJSONCommented(boolean commented)
550     {
551         if (commented)
552             _context.log("JSONCommented is deprecated");
553     }
554 
555     /* ------------------------------------------------------------ */
556     /**
557      * @param logLevel
558      *            the logLevel: 0=none, 1=info, 2=debug
559      */
560     public void setLogLevel(int logLevel)
561     {
562         _logLevel=logLevel;
563     }
564 
565     /* ------------------------------------------------------------ */
566     public void setSecurityPolicy(SecurityPolicy securityPolicy)
567     {
568         _securityPolicy=securityPolicy;
569     }
570 
571     /* ------------------------------------------------------------ */
572     public void setTimeout(long ms)
573     {
574         _timeout=ms;
575         generateAdvice();
576     }
577 
578     /* ------------------------------------------------------------ */
579     public void setInterval(long ms)
580     {
581         _interval=ms;
582         generateAdvice();
583     }
584 
585     /* ------------------------------------------------------------ */
586     /**
587      * The time a client should delay between reconnects when multiple
588      * connections from the same browser are detected. This effectively produces
589      * traditional polling.
590      *
591      * @param multiFrameInterval
592      *            the multiFrameInterval to set
593      */
594     public void setMultiFrameInterval(int multiFrameInterval)
595     {
596         _multiFrameInterval=multiFrameInterval;
597         generateAdvice();
598     }
599 
600     /* ------------------------------------------------------------ */
601     /**
602      * @return the multiFrameInterval in milliseconds
603      */
604     public int getMultiFrameInterval()
605     {
606         return _multiFrameInterval;
607     }
608 
609     /**
610      * @return the limit of the {@link ChannelId} cache
611      * @see #setChannelIdCacheLimit(int)
612      */
613     public int getChannelIdCacheLimit()
614     {
615         return _channelIdCacheLimit;
616     }
617 
618     /**
619      * Sets the cache limit for {@link ChannelId}s: use -1 to disable the cache, 0 for
620      * an unlimited cache, and any positive value for the limit after which the cache
621      * is cleared.
622      * @param channelIdCacheLimit the limit of the {@link ChannelId} cache
623      * @see #getChannelIdCacheLimit()
624      */
625     public void setChannelIdCacheLimit(int channelIdCacheLimit)
626     {
627         this._channelIdCacheLimit = channelIdCacheLimit;
628     }
629 
630     /* ------------------------------------------------------------ */
631     void generateAdvice()
632     {
633         setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":" + getInterval() + ",\"timeout\":" + getTimeout() + "}"));
634     }
635 
636     /* ------------------------------------------------------------ */
637     public void setAdvice(JSON.Literal advice)
638     {
639         synchronized(this)
640         {
641             _adviceVersion++;
642             _advice=advice;
643             _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
644         }
645     }
646 
647     /* ------------------------------------------------------------ */
648     private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
649     {
650         Map<String,Object> a=(Map<String,Object>)JSON.parse(_advice.toString());
651         a.put("multiple-clients",Boolean.TRUE);
652         if (_multiFrameInterval > 0)
653         {
654             a.put("reconnect","retry");
655             a.put("interval",_multiFrameInterval);
656         }
657         else
658             a.put("reconnect","none");
659         return a;
660     }
661 
662     /* ------------------------------------------------------------ */
663     public JSON.Literal getAdvice()
664     {
665         return _advice;
666     }
667 
668     /* ------------------------------------------------------------ */
669     /**
670      * @return TRUE if {@link #getCurrentRequest()} will return the current
671      *         request
672      */
673     public boolean isRequestAvailable()
674     {
675         return _requestAvailable;
676     }
677 
678     /* ------------------------------------------------------------ */
679     /**
680      * @param requestAvailable
681      *            TRUE if {@link #getCurrentRequest()} will return the current
682      *            request
683      */
684     public void setRequestAvailable(boolean requestAvailable)
685     {
686         _requestAvailable=requestAvailable;
687     }
688 
689     /* ------------------------------------------------------------ */
690     /**
691      * @return the current request if {@link #isRequestAvailable()} is true,
692      *         else null
693      */
694     public HttpServletRequest getCurrentRequest()
695     {
696         return _request.get();
697     }
698 
699     /* ------------------------------------------------------------ */
700     void setCurrentRequest(HttpServletRequest request)
701     {
702         _request.set(request);
703     }
704 
705     /* ------------------------------------------------------------ */
706     public Collection<Channel> getChannels()
707     {
708         List<Channel> channels=new ArrayList<Channel>();
709         _root.getChannels(channels);
710         return channels;
711     }
712 
713     /* ------------------------------------------------------------ */
714     /**
715      * @return the number of channels
716      */
717     public int getChannelCount()
718     {
719         return getChannels().size();
720     }
721 
722     /* ------------------------------------------------------------ */
723     public Collection<Client> getClients()
724     {
725         return new ArrayList<Client>(_clients.values());
726     }
727 
728     /* ------------------------------------------------------------ */
729     /**
730      * @return the number of clients
731      */
732     public int getClientCount()
733     {
734         return _clients.size();
735     }
736 
737     /* ------------------------------------------------------------ */
738     public boolean hasClient(String clientId)
739     {
740         if (clientId == null)
741             return false;
742         return _clients.containsKey(clientId);
743     }
744 
745     /* ------------------------------------------------------------ */
746     public Channel removeChannel(String channelId)
747     {
748         Channel channel=getChannel(channelId);
749 
750         boolean removed=false;
751         if (channel != null)
752             removed=channel.remove();
753 
754         if (removed)
755             return channel;
756         else
757             return null;
758     }
759 
760     /* ------------------------------------------------------------ */
761     protected void initialize(ServletContext context)
762     {
763         synchronized(this)
764         {
765             _initialized=true;
766             _context=context;
767             try
768             {
769                 _random=SecureRandom.getInstance("SHA1PRNG");
770             }
771             catch(Exception e)
772             {
773                 context.log("Could not get secure random for ID generation",e);
774                 _random=new Random();
775             }
776             _random.setSeed(_random.nextLong() ^ hashCode() ^ System.nanoTime() ^ Runtime.getRuntime().freeMemory());
777 
778             _root.addChild(new ServiceChannel(Bayeux.SERVICE));
779 
780         }
781     }
782 
783     /* ------------------------------------------------------------ */
784     long getRandom()
785     {
786         long l=_random.nextLong();
787         return l < 0?-l:l;
788     }
789 
790     /* ------------------------------------------------------------ */
791     void clientOnBrowser(String browserId, String clientId)
792     {
793         List<String> clients=_browser2client.get(browserId);
794         if (clients == null)
795         {
796             List<String> new_clients=new CopyOnWriteArrayList<String>();
797             clients=_browser2client.putIfAbsent(browserId,new_clients);
798             if (clients == null)
799                 clients=new_clients;
800         }
801         clients.add(clientId);
802     }
803 
804     /* ------------------------------------------------------------ */
805     void clientOffBrowser(String browserId, String clientId)
806     {
807         List<String> clients=_browser2client.get(browserId);
808         if (clients != null)
809         {
810             clients.remove(clientId);
811             if (clients.isEmpty())
812                 _browser2client.remove(browserId);
813         }
814     }
815 
816     /* ------------------------------------------------------------ */
817     List<String> clientsOnBrowser(String browserId)
818     {
819         return _browser2client.get(browserId);
820     }
821 
822     /* ------------------------------------------------------------ */
823     public void addListener(BayeuxListener listener)
824     {
825         if (listener instanceof ClientBayeuxListener)
826             _clientListeners.add((ClientBayeuxListener)listener);
827         if (listener instanceof ChannelBayeuxListener)
828             _channelListeners.add((ChannelBayeuxListener)listener);
829     }
830 
831     public void removeListener(BayeuxListener listener)
832     {
833         if (listener instanceof ClientBayeuxListener)
834             _clientListeners.remove(listener);
835         if (listener instanceof ChannelBayeuxListener)
836             _channelListeners.remove(listener);
837     }
838 
839     /* ------------------------------------------------------------ */
840     public int getMaxClientQueue()
841     {
842         return _maxClientQueue;
843     }
844 
845     /* ------------------------------------------------------------ */
846     public void setMaxClientQueue(int size)
847     {
848         _maxClientQueue=size;
849     }
850 
851     /* ------------------------------------------------------------ */
852     protected Message extendRcv(ClientImpl from, Message message)
853     {
854         if (_extensions != null)
855         {
856             for (int i=_extensions.length; message != null && i-- > 0;)
857                 message=_extensions[i].rcv(from,message);
858         }
859 
860         if (from != null)
861         {
862             Extension[] client_exs=from.getExtensions();
863             if (client_exs != null)
864             {
865                 for (int i=client_exs.length; message != null && i-- > 0;)
866                     message=client_exs[i].rcv(from,message);
867             }
868         }
869 
870         return message;
871     }
872 
873     /* ------------------------------------------------------------ */
874     protected Message extendRcvMeta(ClientImpl from, Message message)
875     {
876         if (_extensions != null)
877         {
878             for (int i=_extensions.length; message != null && i-- > 0;)
879                 message=_extensions[i].rcvMeta(from,message);
880         }
881 
882         if (from != null)
883         {
884             Extension[] client_exs=from.getExtensions();
885             if (client_exs != null)
886             {
887                 for (int i=client_exs.length; message != null && i-- > 0;)
888                     message=client_exs[i].rcvMeta(from,message);
889             }
890         }
891         return message;
892     }
893 
894     /* ------------------------------------------------------------ */
895     protected Message extendSendBayeux(Client from, Message message)
896     {
897         if (_extensions != null)
898         {
899             for (int i=0; message != null && i < _extensions.length; i++)
900             {
901                 message=_extensions[i].send(from,message);
902             }
903         }
904 
905         return message;
906     }
907 
908     /* ------------------------------------------------------------ */
909     public Message extendSendClient(Client from, ClientImpl to, Message message)
910     {
911         if (to != null)
912         {
913             Extension[] client_exs=to.getExtensions();
914             if (client_exs != null)
915             {
916                 for (int i=0; message != null && i < client_exs.length; i++)
917                     message=client_exs[i].send(from,message);
918             }
919         }
920 
921         return message;
922     }
923 
924     /* ------------------------------------------------------------ */
925     public Message extendSendMeta(ClientImpl from, Message message)
926     {
927         if (_extensions != null)
928         {
929             for (int i=0; message != null && i < _extensions.length; i++)
930                 message=_extensions[i].sendMeta(from,message);
931         }
932 
933         if (from != null)
934         {
935             Extension[] client_exs=from.getExtensions();
936             if (client_exs != null)
937             {
938                 for (int i=0; message != null && i < client_exs.length; i++)
939                     message=client_exs[i].sendMeta(from,message);
940             }
941         }
942 
943         return message;
944     }
945 
946     /* ------------------------------------------------------------ */
947     /**
948      * @return the maximum ms that a lazy message will wait before
949      * resuming waiting client
950      */
951     public int getMaxLazyLatency()
952     {
953         return _maxLazyLatency;
954     }
955 
956     /* ------------------------------------------------------------ */
957     /**
958      * @param ms the maximum ms that a lazy message will wait before
959      * resuming waiting client
960      */
961     public void setMaxLazyLatency(int ms)
962     {
963         _maxLazyLatency = ms;
964     }
965 
966     /* ------------------------------------------------------------ */
967     /* ------------------------------------------------------------ */
968     public static class DefaultPolicy implements SecurityPolicy
969     {
970         public boolean canHandshake(Message message)
971         {
972             return true;
973         }
974 
975         public boolean canCreate(Client client, String channel, Message message)
976         {
977             return client != null && !channel.startsWith(Bayeux.META_SLASH);
978         }
979 
980         public boolean canSubscribe(Client client, String channel, Message message)
981         {
982             if (client != null && ("/**".equals(channel) || "/*".equals(channel)))
983                 return false;
984             return client != null && !channel.startsWith(Bayeux.META_SLASH);
985         }
986 
987         public boolean canPublish(Client client, String channel, Message message)
988         {
989             return client != null || Bayeux.META_HANDSHAKE.equals(channel);
990         }
991 
992     }
993 
994     /* ------------------------------------------------------------ */
995     /* ------------------------------------------------------------ */
996     protected abstract class Handler
997     {
998         abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
999 
1000         abstract ChannelId getMetaChannelId();
1001 
1002         protected boolean isClientUnknown(Client client)
1003         {
1004             return client == null || !hasClient(client.getId());
1005         }
1006 
1007         void unknownClient(Transport transport, String channel) throws IOException
1008         {
1009             MessageImpl reply=newMessage();
1010 
1011             reply.put(CHANNEL_FIELD,channel);
1012             reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1013             reply.put(ERROR_FIELD,"402::Unknown client");
1014             reply.put("advice",_handshakeAdvice);
1015             transport.send(reply);
1016             reply.decRef();
1017         }
1018 
1019         void sendMetaReply(final ClientImpl client, Message reply, final Transport transport) throws IOException
1020         {
1021             reply=extendSendMeta(client,reply);
1022             if (reply != null)
1023             {
1024                 transport.send(reply);
1025                 if (reply instanceof MessageImpl)
1026                     ((MessageImpl)reply).decRef();
1027             }
1028         }
1029     }
1030 
1031     /* ------------------------------------------------------------ */
1032     /* ------------------------------------------------------------ */
1033     protected class ConnectHandler extends Handler
1034     {
1035         protected String _metaChannel=META_CONNECT;
1036 
1037         @Override
1038         ChannelId getMetaChannelId()
1039         {
1040             return META_CONNECT_ID;
1041         }
1042 
1043         @Override
1044         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1045         {
1046             if (isClientUnknown(client))
1047             {
1048                 unknownClient(transport,_metaChannel);
1049                 return;
1050             }
1051 
1052             // is this the first connect message?
1053             String type=client.getConnectionType();
1054             boolean polling=true;
1055             if (type == null)
1056             {
1057                 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
1058                 client.setConnectionType(type);
1059                 polling=false;
1060             }
1061 
1062             Object advice=message.get(ADVICE_FIELD);
1063             if (advice != null)
1064             {
1065                 Long timeout=(Long)((Map)advice).get("timeout");
1066                 if (timeout != null && timeout.longValue() >= 0L)
1067                     client.setTimeout(timeout.longValue());
1068                 else
1069                     client.setTimeout(-1);
1070 
1071                 Long interval=(Long)((Map)advice).get("interval");
1072                 if (interval != null && interval.longValue() >= 0L)
1073                     client.setInterval(interval.longValue());
1074                 else
1075                     client.setInterval(-1);
1076             }
1077             else
1078             {
1079                 client.setTimeout(-1);
1080                 client.setInterval(-1);
1081             }
1082 
1083             advice=null;
1084 
1085             // Work out if multiple clients from some browser?
1086             if (polling && _multiFrameInterval > 0 && client.getBrowserId() != null)
1087             {
1088                 List<String> clients=clientsOnBrowser(client.getBrowserId());
1089                 if (clients != null && clients.size() > 1)
1090                 {
1091                     polling=clients.get(0).equals(client.getId());
1092                     advice=client.getAdvice();
1093                     if (advice == null)
1094                         advice=_multiFrameAdvice;
1095                     else
1096                         // could probably cache this
1097                         advice=multiFrameAdvice((JSON.Literal)advice);
1098                 }
1099             }
1100 
1101             synchronized(this)
1102             {
1103                 if (advice == null)
1104                 {
1105                     if (_adviceVersion != client._adviseVersion)
1106                     {
1107                         advice=_advice;
1108                         client._adviseVersion=_adviceVersion;
1109                     }
1110                 }
1111                 else
1112                     client._adviseVersion=-1; // clear so it is reset after multi state clears
1113             }
1114 
1115             // reply to connect message
1116             String id=message.getId();
1117 
1118             Message reply=newMessage(message);
1119 
1120             reply.put(CHANNEL_FIELD,META_CONNECT);
1121             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1122             if (advice != null)
1123                 reply.put(ADVICE_FIELD,advice);
1124             if (id != null)
1125                 reply.put(ID_FIELD,id);
1126 
1127             if (polling)
1128                 transport.setMetaConnectReply(reply);
1129             else
1130                 sendMetaReply(client,reply,transport);
1131         }
1132     }
1133 
1134     /* ------------------------------------------------------------ */
1135     /* ------------------------------------------------------------ */
1136     protected class DisconnectHandler extends Handler
1137     {
1138         @Override
1139         ChannelId getMetaChannelId()
1140         {
1141             return META_DISCONNECT_ID;
1142         }
1143 
1144         @Override
1145         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1146         {
1147             if (isClientUnknown(client))
1148             {
1149                 unknownClient(transport,META_DISCONNECT);
1150                 return;
1151             }
1152             if (isLogInfo())
1153                 logInfo("Disconnect " + client.getId());
1154 
1155             client.remove(false);
1156 
1157             Message reply=newMessage(message);
1158             reply.put(CHANNEL_FIELD,META_DISCONNECT);
1159             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1160             String id=message.getId();
1161             if (id != null)
1162                 reply.put(ID_FIELD,id);
1163 
1164             Message pollReply=transport.getMetaConnectReply();
1165             if (pollReply != null)
1166             {
1167                 transport.setMetaConnectReply(null);
1168                 sendMetaReply(client,pollReply,transport);
1169             }
1170             sendMetaReply(client,reply,transport);
1171         }
1172     }
1173 
1174     /* ------------------------------------------------------------ */
1175     /* ------------------------------------------------------------ */
1176     protected class HandshakeHandler extends Handler
1177     {
1178         @Override
1179         ChannelId getMetaChannelId()
1180         {
1181             return META_HANDSHAKE_ID;
1182         }
1183 
1184         @Override
1185         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1186         {
1187             if (client != null)
1188                 throw new IllegalStateException();
1189 
1190             if (_securityPolicy != null && !_securityPolicy.canHandshake(message))
1191             {
1192                 Message reply=newMessage(message);
1193                 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1194                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1195                 reply.put(ERROR_FIELD,"403::Handshake denied");
1196 
1197                 sendMetaReply(client,reply,transport);
1198                 return;
1199             }
1200 
1201             client=newRemoteClient();
1202 
1203             Message reply=newMessage(message);
1204             reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1205             reply.put(VERSION_FIELD,"1.0");
1206             reply.put(MIN_VERSION_FIELD,"0.9");
1207 
1208             if (client != null)
1209             {
1210                 reply.put(SUPPORTED_CONNECTION_TYPES_FIELD,_transports);
1211                 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1212                 reply.put(CLIENT_FIELD,client.getId());
1213                 if (_advice != null)
1214                     reply.put(ADVICE_FIELD,_advice);
1215             }
1216             else
1217             {
1218                 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1219                 if (_advice != null)
1220                     reply.put(ADVICE_FIELD,_advice);
1221             }
1222 
1223             if (isLogDebug())
1224                 logDebug("handshake.handle: reply=" + reply);
1225 
1226             String id=message.getId();
1227             if (id != null)
1228                 reply.put(ID_FIELD,id);
1229 
1230             sendMetaReply(client,reply,transport);
1231         }
1232     }
1233 
1234     /* ------------------------------------------------------------ */
1235     /* ------------------------------------------------------------ */
1236     protected class PublishHandler extends Handler
1237     {
1238         @Override
1239         ChannelId getMetaChannelId()
1240         {
1241             return null;
1242         }
1243 
1244         @Override
1245         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1246         {
1247             if (message == null)
1248             {
1249                 Message reply = newMessage(message);
1250                 reply.put(SUCCESSFUL_FIELD, Boolean.FALSE);
1251                 reply.put(ERROR_FIELD, "404::Message deleted");
1252                 sendMetaReply(client, reply, transport);
1253                 return;
1254             }
1255 
1256             String channel_id=message.getChannel();
1257             if (isClientUnknown(client) && message.containsKey(CLIENT_FIELD))
1258             {
1259                 unknownClient(transport,channel_id);
1260                 return;
1261             }
1262 
1263             String id=message.getId();
1264 
1265             ChannelId cid=getChannelId(channel_id);
1266             Object data=message.get(Bayeux.DATA_FIELD);
1267 
1268             Message reply=newMessage(message);
1269             reply.put(CHANNEL_FIELD,channel_id);
1270             if (id != null)
1271                 reply.put(ID_FIELD,id);
1272 
1273             if (data == null)
1274             {
1275                 message=null;
1276                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1277                 reply.put(ERROR_FIELD,"403::No data");
1278             }
1279             else if (!_securityPolicy.canPublish(client,channel_id,message))
1280             {
1281                 message=null;
1282                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1283                 reply.put(ERROR_FIELD,"403::Publish denied");
1284             }
1285             else
1286             {
1287                 reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1288             }
1289 
1290             sendMetaReply(client,reply,transport);
1291 
1292             if (message != null)
1293                 _root.doDelivery(cid,client,message);
1294         }
1295     }
1296 
1297     /* ------------------------------------------------------------ */
1298     /* ------------------------------------------------------------ */
1299     protected class MetaPublishHandler extends Handler
1300     {
1301         @Override
1302         ChannelId getMetaChannelId()
1303         {
1304             return null;
1305         }
1306 
1307         @Override
1308         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1309         {
1310             String channel_id=message.getChannel();
1311 
1312             if (isClientUnknown(client) && !META_HANDSHAKE.equals(channel_id) && !META_DISCONNECT.equals(channel_id))
1313             {
1314                 // unknown client
1315                 return;
1316             }
1317 
1318             if (_securityPolicy.canPublish(client,channel_id,message))
1319             {
1320                 _root.doDelivery(getChannelId(channel_id),client,message);
1321             }
1322         }
1323     }
1324 
1325     /* ------------------------------------------------------------ */
1326     /* ------------------------------------------------------------ */
1327     protected class SubscribeHandler extends Handler
1328     {
1329         @Override
1330         ChannelId getMetaChannelId()
1331         {
1332             return META_SUBSCRIBE_ID;
1333         }
1334 
1335         @Override
1336         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1337         {
1338             if (isClientUnknown(client))
1339             {
1340                 unknownClient(transport,META_SUBSCRIBE);
1341                 return;
1342             }
1343 
1344             String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1345 
1346             // select a random channel ID if none specifified
1347             if (subscribe_id == null)
1348             {
1349                 subscribe_id=Long.toString(getRandom(),36);
1350                 while(getChannel(subscribe_id) != null)
1351                     subscribe_id=Long.toString(getRandom(),36);
1352             }
1353 
1354             ChannelId cid=null;
1355             boolean can_subscribe=false;
1356 
1357             if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1358             {
1359                 can_subscribe=true;
1360             }
1361             else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1362             {
1363                 can_subscribe=false;
1364             }
1365             else
1366             {
1367                 cid=getChannelId(subscribe_id);
1368                 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1369             }
1370 
1371             Message reply=newMessage(message);
1372             reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1373             reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1374 
1375             if (can_subscribe)
1376             {
1377                 if (cid != null)
1378                 {
1379                     ChannelImpl channel=getChannel(cid);
1380                     if (channel == null && _securityPolicy.canCreate(client,subscribe_id,message))
1381                         channel=(ChannelImpl)getChannel(subscribe_id,true);
1382 
1383                     if (channel != null)
1384                     {
1385                         // Reduces the window of time where a server-side expiration
1386                         // or a concurrent disconnect causes the invalid client to be
1387                         // registered as subscriber and hence being kept alive by the
1388                         // fact that the channel references it.
1389                         if (isClientUnknown(client))
1390                         {
1391                             unknownClient(transport, META_SUBSCRIBE);
1392                             return;
1393                         }
1394                         else
1395                         {
1396                             channel.subscribe(client);
1397                         }
1398                     }
1399                     else
1400                     {
1401                         can_subscribe=false;
1402                     }
1403                 }
1404 
1405                 if (can_subscribe)
1406                 {
1407                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1408                 }
1409                 else
1410                 {
1411                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1412                     reply.put(ERROR_FIELD,"403::cannot create");
1413                 }
1414             }
1415             else
1416             {
1417                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1418                 reply.put(ERROR_FIELD,"403::cannot subscribe");
1419 
1420             }
1421 
1422             String id=message.getId();
1423             if (id != null)
1424                 reply.put(ID_FIELD,id);
1425 
1426             sendMetaReply(client,reply,transport);
1427         }
1428     }
1429 
1430     /* ------------------------------------------------------------ */
1431     /* ------------------------------------------------------------ */
1432     protected class UnsubscribeHandler extends Handler
1433     {
1434         @Override
1435         ChannelId getMetaChannelId()
1436         {
1437             return META_UNSUBSCRIBE_ID;
1438         }
1439 
1440         @Override
1441         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1442         {
1443             if (isClientUnknown(client))
1444             {
1445                 unknownClient(transport,META_UNSUBSCRIBE);
1446                 return;
1447             }
1448 
1449             String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1450             ChannelImpl channel=getChannel(channel_id);
1451             if (channel != null)
1452                 channel.unsubscribe(client);
1453 
1454             Message reply=newMessage(message);
1455             reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1456             reply.put(SUBSCRIPTION_FIELD,channel_id);
1457             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1458 
1459             String id=message.getId();
1460             if (id != null)
1461                 reply.put(ID_FIELD,id);
1462 
1463             sendMetaReply(client,reply,transport);
1464         }
1465     }
1466 
1467     /* ------------------------------------------------------------ */
1468     /* ------------------------------------------------------------ */
1469     protected class PingHandler extends Handler
1470     {
1471         @Override
1472         ChannelId getMetaChannelId()
1473         {
1474             return META_PING_ID;
1475         }
1476 
1477         @Override
1478         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1479         {
1480             Message reply=newMessage(message);
1481             reply.put(CHANNEL_FIELD,META_PING);
1482             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1483 
1484             String id=message.getId();
1485             if (id != null)
1486                 reply.put(ID_FIELD,id);
1487 
1488             sendMetaReply(client,reply,transport);
1489         }
1490     }
1491 
1492     /* ------------------------------------------------------------ */
1493     /* ------------------------------------------------------------ */
1494     protected class ServiceChannel extends ChannelImpl
1495     {
1496         ServiceChannel(String id)
1497         {
1498             super(id,AbstractBayeux.this);
1499             setPersistent(true);
1500         }
1501 
1502         /* ------------------------------------------------------------ */
1503         @Override
1504         public ChannelImpl addChild(ChannelImpl channel)
1505         {
1506             channel.setPersistent(true);
1507             return super.addChild(channel);
1508         }
1509 
1510         /* ------------------------------------------------------------ */
1511         @Override
1512         public void subscribe(Client client)
1513         {
1514             if (client.isLocal())
1515                 super.subscribe(client);
1516         }
1517 
1518     }
1519 }