1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.CopyOnWriteArrayList;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24
25 import org.cometd.Bayeux;
26 import org.cometd.Channel;
27 import org.cometd.ChannelBayeuxListener;
28 import org.cometd.ChannelListener;
29 import org.cometd.Client;
30 import org.cometd.DataFilter;
31 import org.cometd.Message;
32 import org.cometd.SubscriptionListener;
33 import org.mortbay.log.Log;
34
35
36
37
38
39
40
41 public class ChannelImpl implements Channel
42 {
43 private final AbstractBayeux _bayeux;
44 private final ChannelId _id;
45 private final ConcurrentHashMap<String,ChannelImpl> _children=new ConcurrentHashMap<String,ChannelImpl>();
46 private final List<ClientImpl> _subscribers=new CopyOnWriteArrayList<ClientImpl>();
47 private final List<DataFilter> _dataFilters=new CopyOnWriteArrayList<DataFilter>();
48 private final List<SubscriptionListener> _subscriptionListeners=new CopyOnWriteArrayList<SubscriptionListener>();
49 private final CountDownLatch _initialized = new CountDownLatch(1);
50 private volatile ChannelImpl _wild;
51 private volatile ChannelImpl _wildWild;
52 private volatile boolean _persistent;
53 private volatile int _split;
54 private volatile boolean _lazy;
55
56
57 protected ChannelImpl(String id, AbstractBayeux bayeux)
58 {
59 _id=new ChannelId(id);
60 _bayeux=bayeux;
61 }
62
63
64
65
66
67
68
69
70
71
72
73 private void waitForInitialized()
74 {
75 try
76 {
77 if (!_initialized.await(_bayeux.getMaxInterval(), TimeUnit.MILLISECONDS))
78 throw new IllegalStateException("Not Initialized: " + this);
79 }
80 catch (InterruptedException x)
81 {
82 throw new IllegalStateException("Initialization interrupted: " + this, x);
83 }
84 }
85
86
87 private void initialized()
88 {
89 _initialized.countDown();
90 }
91
92
93
94
95
96
97
98
99 public boolean isLazy()
100 {
101 return _lazy;
102 }
103
104
105
106
107
108
109
110
111
112 public void setLazy(boolean lazy)
113 {
114 _lazy=lazy;
115 }
116
117
118
119
120
121
122
123
124 public ChannelImpl addChild(ChannelImpl channel)
125 {
126 ChannelId child=channel.getChannelId();
127 if (!_id.isParentOf(child))
128 {
129 throw new IllegalArgumentException(_id + " not parent of " + child);
130 }
131
132 String next=child.getSegment(_id.depth());
133
134 if ((child.depth() - _id.depth()) == 1)
135 {
136
137 ChannelImpl old=_children.putIfAbsent(next,channel);
138 if (old != null)
139 {
140 old.waitForInitialized();
141 return old;
142 }
143
144 if (ChannelId.WILD.equals(next))
145 _wild=channel;
146 else if (ChannelId.WILDWILD.equals(next))
147 _wildWild=channel;
148 _bayeux.addChannel(channel);
149 channel.initialized();
150 return channel;
151 }
152 else
153 {
154 ChannelImpl branch=(ChannelImpl)_bayeux.getChannel((_id.depth() == 0?"/":(_id.toString() + "/")) + next,true);
155 return branch.addChild(channel);
156 }
157 }
158
159
160
161
162
163 public void addDataFilter(DataFilter filter)
164 {
165 _dataFilters.add(filter);
166 }
167
168
169
170
171
172 public ChannelId getChannelId()
173 {
174 return _id;
175 }
176
177
178 public ChannelImpl getChild(ChannelId id)
179 {
180 String next=id.getSegment(_id.depth());
181 if (next == null)
182 return null;
183
184 ChannelImpl channel=_children.get(next);
185 if (channel!=null)
186 channel.waitForInitialized();
187
188 if (channel == null || channel.getChannelId().depth() == id.depth())
189 {
190 return channel;
191 }
192 return channel.getChild(id);
193 }
194
195
196 public void getChannels(List<Channel> list)
197 {
198 list.add(this);
199 for (ChannelImpl channel : _children.values())
200 channel.getChannels(list);
201 }
202
203
204 public int getChannelCount()
205 {
206 return _children.size();
207 }
208
209
210
211
212
213 public String getId()
214 {
215 return _id.toString();
216 }
217
218
219 public boolean isPersistent()
220 {
221 return _persistent;
222 }
223
224
225 public void deliver(Client from, Iterable<Client> to, Object data, String id)
226 {
227 MessageImpl message=_bayeux.newMessage();
228 message.put(Bayeux.CHANNEL_FIELD,getId());
229 message.put(Bayeux.DATA_FIELD,data);
230 if (id != null)
231 message.put(Bayeux.ID_FIELD,id);
232
233 Message m=_bayeux.extendSendBayeux(from,message);
234
235 if (m != null)
236 {
237 for (Client t : to)
238 deliverToSubscriber((ClientImpl)t,from,m);
239 }
240 if (m instanceof MessageImpl)
241 ((MessageImpl)m).decRef();
242 }
243
244
245 public void publish(Client fromClient, Object data, String msgId)
246 {
247 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
248 }
249
250
251 public void publishLazy(Client fromClient, Object data, String msgId)
252 {
253 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
254 }
255
256
257 public boolean remove()
258 {
259 return _bayeux.removeChannel(this);
260 }
261
262
263 public boolean doRemove(ChannelImpl channel, List<ChannelBayeuxListener> listeners)
264 {
265 ChannelId channelId=channel.getChannelId();
266 int diff=channel._id.depth() - _id.depth();
267
268 if (diff >= 1)
269 {
270 String key=channelId.getSegment(_id.depth());
271 ChannelImpl child=_children.get(key);
272
273 if (child != null)
274 {
275
276 if (diff == 1)
277 {
278 if (!child.isPersistent())
279 {
280
281 child=_children.remove(key);
282 if (child !=null)
283 {
284 if (_wild==channel)
285 _wild=null;
286 else if (_wildWild==channel)
287 _wildWild=null;
288 if ( child.getChannelCount() > 0)
289 {
290
291 for (ChannelImpl c : child._children.values())
292 child.doRemove(c,listeners);
293 }
294 for (ChannelBayeuxListener l : listeners)
295 l.channelRemoved(child);
296 }
297 return true;
298 }
299 return false;
300 }
301
302 boolean removed=child.doRemove(channel,listeners);
303
304
305 if (removed && !child.isPersistent() && child.getChannelCount() == 0 && child.getSubscriberCount() == 0)
306 {
307 child=_children.remove(key);
308 if (child!=null)
309 for (ChannelBayeuxListener l : listeners)
310 l.channelRemoved(child);
311 }
312
313 return removed;
314 }
315
316 }
317 return false;
318 }
319
320
321
322
323
324 public DataFilter removeDataFilter(DataFilter filter)
325 {
326 _dataFilters.remove(filter);
327 return filter;
328 }
329
330
331 public void setPersistent(boolean persistent)
332 {
333 _persistent=persistent;
334 }
335
336
337
338
339
340 public void subscribe(Client client)
341 {
342 if (!(client instanceof ClientImpl))
343 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
344
345 for (ClientImpl c : _subscribers)
346 {
347 if (client.equals(c))
348 return;
349 }
350
351 _subscribers.add((ClientImpl)client);
352
353 for (SubscriptionListener l : _subscriptionListeners)
354 l.subscribed(client,this);
355
356 ((ClientImpl)client).addSubscription(this);
357 }
358
359
360 @Override
361 public String toString()
362 {
363 return _id.toString();
364 }
365
366
367
368
369
370 public void unsubscribe(Client c)
371 {
372 if (!(c instanceof ClientImpl))
373 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
374 ClientImpl client = (ClientImpl)c;
375
376 client.removeSubscription(this);
377
378 _subscribers.remove(client);
379
380 for (SubscriptionListener l : _subscriptionListeners)
381 l.unsubscribed(client,this);
382
383 if (!_persistent && _subscribers.size() == 0 && _children.size() == 0)
384 remove();
385 }
386
387
388 protected void doDelivery(ChannelId to, Client from, Message msg)
389 {
390 int tail=to.depth() - _id.depth();
391
392 Object data=msg.getData();
393
394
395 if (data != null)
396 {
397 Object old=data;
398
399 try
400 {
401 switch(tail)
402 {
403 case 0:
404 {
405 for (DataFilter filter : _dataFilters)
406 {
407 data=filter.filter(from,this,data);
408 if (data == null)
409 return;
410 }
411 }
412 break;
413
414 case 1:
415 final ChannelImpl wild = _wild;
416 if (wild != null)
417 {
418 for (DataFilter filter : wild._dataFilters)
419 {
420 data=filter.filter(from,this,data);
421 if (data == null)
422 return;
423 }
424 }
425
426 default:
427 final ChannelImpl wildWild = _wildWild;
428 if (wildWild != null)
429 {
430 for (DataFilter filter : wildWild._dataFilters)
431 {
432 data=filter.filter(from,this,data);
433 if (data == null)
434 return;
435 }
436 }
437 }
438 }
439 catch(IllegalStateException e)
440 {
441 Log.ignore(e);
442 return;
443 }
444
445
446
447 if (data != old)
448 msg.put(AbstractBayeux.DATA_FIELD,data);
449 }
450
451 switch(tail)
452 {
453 case 0:
454 {
455 if (_lazy && msg instanceof MessageImpl)
456 ((MessageImpl)msg).setLazy(true);
457
458 final ClientImpl[] subscribers=_subscribers.toArray(new ClientImpl[_subscribers.size()]);
459 if (subscribers.length > 0)
460 {
461
462 int split=_split++ % subscribers.length;
463 for (int i=split; i < subscribers.length; i++)
464 deliverToSubscriber(subscribers[i],from,msg);
465 for (int i=0; i < split; i++)
466 deliverToSubscriber(subscribers[i],from,msg);
467 }
468 break;
469 }
470
471 case 1:
472 final ChannelImpl wild = _wild;
473 if (wild != null)
474 {
475 if (wild._lazy && msg instanceof MessageImpl)
476 ((MessageImpl)msg).setLazy(true);
477 for (ClientImpl client : wild._subscribers)
478 wild.deliverToSubscriber(client,from,msg);
479 }
480
481 default:
482 {
483 final ChannelImpl wildWild = _wildWild;
484 if (wildWild != null)
485 {
486 if (wildWild._lazy && msg instanceof MessageImpl)
487 ((MessageImpl)msg).setLazy(true);
488 for (ClientImpl client : wildWild._subscribers)
489 wildWild.deliverToSubscriber(client,from,msg);
490 }
491 String next=to.getSegment(_id.depth());
492 ChannelImpl channel=_children.get(next);
493 if (channel != null)
494 channel.doDelivery(to,from,msg);
495 }
496 }
497 }
498
499 private void deliverToSubscriber(ClientImpl subscriber, Client from, Message message)
500 {
501 if (_bayeux.hasClient(subscriber.getId()))
502 subscriber.doDelivery(from, message);
503 else
504 unsubscribe(subscriber);
505 }
506
507
508 public Collection<Client> getSubscribers()
509 {
510 return new ArrayList<Client>(_subscribers);
511 }
512
513
514 public int getSubscriberCount()
515 {
516 return _subscribers.size();
517 }
518
519
520
521
522
523
524
525 public Collection<DataFilter> getDataFilters()
526 {
527 return new ArrayList<DataFilter>(_dataFilters);
528 }
529
530
531 public void addListener(ChannelListener listener)
532 {
533 if (listener instanceof SubscriptionListener)
534 {
535 _subscriptionListeners.add((SubscriptionListener)listener);
536 }
537 }
538
539 public void removeListener(ChannelListener listener)
540 {
541 if (listener instanceof SubscriptionListener)
542 {
543 _subscriptionListeners.remove((SubscriptionListener)listener);
544 }
545 }
546 }