View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.ArrayList;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.CopyOnWriteArrayList;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.TimeUnit;
24  
25  import org.cometd.Bayeux;
26  import org.cometd.Channel;
27  import org.cometd.ChannelBayeuxListener;
28  import org.cometd.ChannelListener;
29  import org.cometd.Client;
30  import org.cometd.DataFilter;
31  import org.cometd.Message;
32  import org.cometd.SubscriptionListener;
33  import org.mortbay.log.Log;
34  
35  /* ------------------------------------------------------------ */
36  /**
37   * A Bayeux Channel
38   *
39   * @version $Revision: 1891 $ $Date: 2010-05-10 21:12:17 +1000 (Mon, 10 May 2010) $
40   */
41  public class ChannelImpl implements Channel
42  {
43      private final AbstractBayeux _bayeux;
44      private final ChannelId _id;
45      private final ConcurrentHashMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
46      private final List<ClientImpl> _subscribers=new CopyOnWriteArrayList<ClientImpl>();
47      private final List<DataFilter> _dataFilters=new CopyOnWriteArrayList<DataFilter>();
48      private final List<SubscriptionListener> _subscriptionListeners=new CopyOnWriteArrayList<SubscriptionListener>();
49      private final CountDownLatch _initialized = new CountDownLatch(1);
50      private volatile ChannelImpl _wild;
51      private volatile ChannelImpl _wildWild;
52      private volatile boolean _persistent;
53      private volatile int _split;
54      private volatile boolean _lazy;
55  
56      /* ------------------------------------------------------------ */
57      protected ChannelImpl(String id, AbstractBayeux bayeux)
58      {
59          _id=new ChannelId(id);
60          _bayeux=bayeux;
61      }
62  
63      /* ------------------------------------------------------------ */
64      /**
65       * Wait for the channel to be initialized, at most for bayeux maxInterval.
66       * Channel initialization means waiting for {@link #addChild(ChannelImpl)}
67       * to finish calling {@link AbstractBayeux#addChannel(ChannelImpl)},
68       * which notifies channel listeners. Channel initialization may therefore
69       * be delayed in case of slow listeners, but it is guaranteed that any
70       * concurrent channel creation will wait until listeners have been called,
71       * therefore enforcing channel creation + notification to be atomic.
72       */
73      private void waitForInitialized()
74      {
75          try
76          {
77              if (!_initialized.await(_bayeux.getMaxInterval(), TimeUnit.MILLISECONDS))
78                  throw new IllegalStateException("Not Initialized: " + this);
79          }
80          catch (InterruptedException x)
81          {
82              throw new IllegalStateException("Initialization interrupted: " + this, x);
83          }
84      }
85  
86      /* ------------------------------------------------------------ */
87      private void initialized()
88      {
89          _initialized.countDown();
90      }
91  
92      /* ------------------------------------------------------------ */
93      /**
94       * A Lazy channel marks published messages as lazy. Lazy messages are queued
95       * but do not wake up waiting clients.
96       *
97       * @return true if message is lazy
98       */
99      public boolean isLazy()
100     {
101         return _lazy;
102     }
103 
104     /* ------------------------------------------------------------ */
105     /**
106      * A Lazy channel marks published messages as lazy. Lazy messages are queued
107      * but do not wake up waiting clients.
108      *
109      * @param lazy
110      *            true if message is lazy
111      */
112     public void setLazy(boolean lazy)
113     {
114         _lazy=lazy;
115     }
116 
117     /* ------------------------------------------------------------ */
118     /**
119      * Adds a channel
120      * @param channel the child channel to add
121      * @return The added channel, or the existing channel if another thread
122      * already added the channel
123      */
124     public ChannelImpl addChild(ChannelImpl channel)
125     {
126         ChannelId child=channel.getChannelId();
127         if (!_id.isParentOf(child))
128         {
129             throw new IllegalArgumentException(_id + " not parent of " + child);
130         }
131 
132         String next=child.getSegment(_id.depth());
133 
134         if ((child.depth() - _id.depth()) == 1)
135         {
136             // add the channel to this channels
137             ChannelImpl old=_children.putIfAbsent(next,channel);
138             if (old != null)
139             {
140                 old.waitForInitialized();
141                 return old;
142             }
143 
144             if (ChannelId.WILD.equals(next))
145                 _wild=channel;
146             else if (ChannelId.WILDWILD.equals(next))
147                 _wildWild=channel;
148             _bayeux.addChannel(channel);
149             channel.initialized();
150             return channel;
151         }
152         else
153         {
154             ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
155             return branch.addChild(channel);
156         }
157     }
158 
159     /* ------------------------------------------------------------ */
160     /**
161      * @param filter the data filter to add
162      */
163     public void addDataFilter(DataFilter filter)
164     {
165         _dataFilters.add(filter);
166     }
167 
168     /* ------------------------------------------------------------ */
169     /**
170      * @return the ChannelId of this channel
171      */
172     public ChannelId getChannelId()
173     {
174         return _id;
175     }
176 
177     /* ------------------------------------------------------------ */
178     public ChannelImpl getChild(ChannelId id)
179     {
180         String next=id.getSegment(_id.depth());
181         if (next == null)
182             return null;
183 
184         ChannelImpl channel=_children.get(next);
185         if (channel!=null)
186             channel.waitForInitialized();
187 
188         if (channel == null || channel.getChannelId().depth() == id.depth())
189         {
190             return channel;
191         }
192         return channel.getChild(id);
193     }
194 
195     /* ------------------------------------------------------------ */
196     public void getChannels(List<Channel> list)
197     {
198         list.add(this);
199         for (ChannelImpl channel : _children.values())
200             channel.getChannels(list);
201     }
202 
203     /* ------------------------------------------------------------ */
204     public int getChannelCount()
205     {
206         return _children.size();
207     }
208 
209     /* ------------------------------------------------------------ */
210     /**
211      * @return the id of this channel in string form
212      */
213     public String getId()
214     {
215         return _id.toString();
216     }
217 
218     /* ------------------------------------------------------------ */
219     public boolean isPersistent()
220     {
221         return _persistent;
222     }
223 
224     /* ------------------------------------------------------------ */
225     public void deliver(Client from, Iterable<Client> to, Object data, String id)
226     {
227         MessageImpl message=_bayeux.newMessage();
228         message.put(Bayeux.CHANNEL_FIELD,getId());
229         message.put(Bayeux.DATA_FIELD,data);
230         if (id != null)
231             message.put(Bayeux.ID_FIELD,id);
232 
233         Message m=_bayeux.extendSendBayeux(from,message);
234 
235         if (m != null)
236         {
237             for (Client t : to)
238                 deliverToSubscriber((ClientImpl)t,from,m);
239         }
240         if (m instanceof MessageImpl)
241             ((MessageImpl)m).decRef();
242     }
243 
244     /* ------------------------------------------------------------ */
245     public void publish(Client fromClient, Object data, String msgId)
246     {
247         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
248     }
249 
250     /* ------------------------------------------------------------ */
251     public void publishLazy(Client fromClient, Object data, String msgId)
252     {
253         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
254     }
255 
256     /* ------------------------------------------------------------ */
257     public boolean remove()
258     {
259         return _bayeux.removeChannel(this);
260     }
261 
262     /* ------------------------------------------------------------ */
263     public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
264     {
265         ChannelId channelId=channel.getChannelId();
266         int diff=channel._id.depth() - _id.depth();
267 
268         if (diff >= 1)
269         {
270             String key=channelId.getSegment(_id.depth());
271             ChannelImpl child=_children.get(key);
272 
273             if (child != null)
274             {
275                 // is it this child we are removing?
276                 if (diff == 1)
277                 {
278                     if (!child.isPersistent())
279                     {
280                         // remove the child
281                         child=_children.remove(key);
282                         if (child !=null)
283                         {
284                             if (_wild==channel)
285                                 _wild=null;
286                             else if (_wildWild==channel)
287                                 _wildWild=null;
288                             if ( child.getChannelCount() > 0)
289                             {
290                                 // remove the children of the child
291                                 for (ChannelImpl c : child._children.values())
292                                     child.doRemove(c,listeners);
293                             }
294                             for (ChannelBayeuxListener l : listeners)
295                                 l.channelRemoved(child);
296                         }
297                         return true;
298                     }
299                     return false;
300                 }
301 
302                 boolean removed=child.doRemove(channel,listeners);
303 
304                 // Do we remove a non persistent child?
305                 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
306                 {
307                     child=_children.remove(key);
308                     if (child!=null)
309                         for (ChannelBayeuxListener l : listeners)
310                             l.channelRemoved(child);
311                 }
312 
313                 return removed;
314             }
315 
316         }
317         return false;
318     }
319 
320     /* ------------------------------------------------------------ */
321     /**
322      * @param filter the data filter to remove
323      */
324     public DataFilter removeDataFilter(DataFilter filter)
325     {
326         _dataFilters.remove(filter);
327         return filter;
328     }
329 
330     /* ------------------------------------------------------------ */
331     public void setPersistent(boolean persistent)
332     {
333         _persistent=persistent;
334     }
335 
336     /* ------------------------------------------------------------ */
337     /**
338      * @param client the client to subscribe to this channel
339      */
340     public void subscribe(Client client)
341     {
342         if (!(client instanceof ClientImpl))
343             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
344 
345         for (ClientImpl c : _subscribers)
346         {
347             if (client.equals(c))
348                 return;
349         }
350 
351         _subscribers.add((ClientImpl)client);
352 
353         for (SubscriptionListener l : _subscriptionListeners)
354             l.subscribed(client,this);
355 
356         ((ClientImpl)client).addSubscription(this);
357     }
358 
359     /* ------------------------------------------------------------ */
360     @Override
361     public String toString()
362     {
363         return _id.toString();
364     }
365 
366     /* ------------------------------------------------------------ */
367     /**
368      * @param c the client to unsubscribe from this channel
369      */
370     public void unsubscribe(Client c)
371     {
372         if (!(c instanceof ClientImpl))
373             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
374         ClientImpl client = (ClientImpl)c;
375 
376         client.removeSubscription(this);
377 
378         _subscribers.remove(client);
379 
380         for (SubscriptionListener l : _subscriptionListeners)
381             l.unsubscribed(client,this);
382 
383         if (!_persistent && _subscribers.size() == 0 && _children.size() == 0)
384             remove();
385     }
386 
387     /* ------------------------------------------------------------ */
388     protected void doDelivery(ChannelId to, Client from, Message msg)
389     {
390         int tail=to.depth() - _id.depth();
391 
392         Object data=msg.getData();
393 
394         // if we have data, filter it
395         if (data != null)
396         {
397             Object old=data;
398 
399             try
400             {
401                 switch(tail)
402                 {
403                     case 0:
404                     {
405                         for (DataFilter filter : _dataFilters)
406                         {
407                             data=filter.filter(from,this,data);
408                             if (data == null)
409                                 return;
410                         }
411                     }
412                         break;
413 
414                     case 1:
415                         final ChannelImpl wild = _wild;
416                         if (wild != null)
417                         {
418                             for (DataFilter filter : wild._dataFilters)
419                             {
420                                 data=filter.filter(from,this,data);
421                                 if (data == null)
422                                     return;
423                             }
424                         }
425 
426                     default:
427                         final ChannelImpl wildWild = _wildWild;
428                         if (wildWild != null)
429                         {
430                             for (DataFilter filter : wildWild._dataFilters)
431                             {
432                                 data=filter.filter(from,this,data);
433                                 if (data == null)
434                                     return;
435                             }
436                         }
437                 }
438             }
439             catch(IllegalStateException e)
440             {
441                 Log.ignore(e);
442                 return;
443             }
444 
445             // TODO this may not be correct if the message is reused.
446             // probably should close message ?
447             if (data != old)
448                 msg.put(AbstractBayeux.DATA_FIELD,data);
449         }
450 
451         switch(tail)
452         {
453             case 0:
454             {
455                 if (_lazy && msg instanceof MessageImpl)
456                     ((MessageImpl)msg).setLazy(true);
457 
458                 final ClientImpl[] subscribers=_subscribers.toArray(new ClientImpl[_subscribers.size()]);
459                 if (subscribers.length > 0)
460                 {
461                     // fair delivery
462                     int split=_split++ % subscribers.length;
463                     for (int i=split; i < subscribers.length; i++)
464                         deliverToSubscriber(subscribers[i],from,msg);
465                     for (int i=0; i < split; i++)
466                         deliverToSubscriber(subscribers[i],from,msg);
467                 }
468                 break;
469             }
470 
471             case 1:
472                 final ChannelImpl wild = _wild;
473                 if (wild != null)
474                 {
475                     if (wild._lazy && msg instanceof MessageImpl)
476                         ((MessageImpl)msg).setLazy(true);
477                     for (ClientImpl client : wild._subscribers)
478                         wild.deliverToSubscriber(client,from,msg);
479                 }
480 
481             default:
482             {
483                 final ChannelImpl wildWild = _wildWild;
484                 if (wildWild != null)
485                 {
486                     if (wildWild._lazy && msg instanceof MessageImpl)
487                         ((MessageImpl)msg).setLazy(true);
488                     for (ClientImpl client : wildWild._subscribers)
489                         wildWild.deliverToSubscriber(client,from,msg);
490                 }
491                 String next=to.getSegment(_id.depth());
492                 ChannelImpl channel=_children.get(next);
493                 if (channel != null)
494                     channel.doDelivery(to,from,msg);
495             }
496         }
497     }
498 
499     private void deliverToSubscriber(ClientImpl subscriber, Client from, Message message)
500     {
501         if (_bayeux.hasClient(subscriber.getId()))
502             subscriber.doDelivery(from, message);
503         else
504             unsubscribe(subscriber);
505     }
506 
507     /* ------------------------------------------------------------ */
508     public Collection<Client> getSubscribers()
509     {
510         return new ArrayList<Client>(_subscribers);
511     }
512 
513     /* ------------------------------------------------------------ */
514     public int getSubscriberCount()
515     {
516         return _subscribers.size();
517     }
518 
519     /* ------------------------------------------------------------ */
520     /*
521      * (non-Javadoc)
522      *
523      * @see dojox.cometd.Channel#getFilters()
524      */
525     public Collection<DataFilter> getDataFilters()
526     {
527         return new ArrayList<DataFilter>(_dataFilters);
528     }
529 
530     /* ------------------------------------------------------------ */
531     public void addListener(ChannelListener listener)
532     {
533         if (listener instanceof SubscriptionListener)
534         {
535             _subscriptionListeners.add((SubscriptionListener)listener);
536         }
537     }
538 
539     public void removeListener(ChannelListener listener)
540     {
541         if (listener instanceof SubscriptionListener)
542         {
543             _subscriptionListeners.remove((SubscriptionListener)listener);
544         }
545     }
546 }