1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22
23 import org.cometd.Channel;
24 import org.cometd.ChannelListener;
25 import org.cometd.Client;
26 import org.cometd.DataFilter;
27 import org.cometd.Message;
28 import org.cometd.SubscriptionListener;
29 import org.mortbay.log.Log;
30 import org.mortbay.util.LazyList;
31
32
33
34
35
36
37
38
39 public class ChannelImpl implements Channel
40 {
41 protected AbstractBayeux _bayeux;
42 private ClientImpl[] _subscribers=new ClientImpl[0];
43 private DataFilter[] _dataFilters=new DataFilter[0];
44 private SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0];
45 private ChannelId _id;
46 private ConcurrentMap<String,ChannelImpl> _children = new ConcurrentHashMap<String, ChannelImpl>();
47 private ChannelImpl _wild;
48 private ChannelImpl _wildWild;
49 private boolean _persistent;
50 private int _split;
51
52
53 ChannelImpl(String id,AbstractBayeux bayeux)
54 {
55 _id=new ChannelId(id);
56 _bayeux=bayeux;
57 }
58
59
60 public void addChild(ChannelImpl channel)
61 {
62 ChannelId child=channel.getChannelId();
63 if (!_id.isParentOf(child))
64 {
65 throw new IllegalArgumentException(_id+" not parent of "+child);
66 }
67
68 String next = child.getSegment(_id.depth());
69
70 if ((child.depth()-_id.depth())==1)
71 {
72
73 ChannelImpl old = _children.putIfAbsent(next,channel);
74
75 if (old!=null)
76 throw new IllegalArgumentException("Already Exists");
77
78 if (ChannelId.WILD.equals(next))
79 _wild=channel;
80 else if (ChannelId.WILDWILD.equals(next))
81 _wildWild=channel;
82
83 }
84 else
85 {
86 ChannelImpl branch=_children.get(next);
87 branch=(ChannelImpl)_bayeux.getChannel((_id.depth()==0?"/":(_id.toString()+"/"))+next,true);
88
89 branch.addChild(channel);
90 }
91
92 _bayeux.addChannel(channel);
93 }
94
95
96
97
98
99 public void addDataFilter(DataFilter filter)
100 {
101 synchronized(this)
102 {
103 _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
104 }
105 }
106
107
108
109
110
111
112 public ChannelId getChannelId()
113 {
114 return _id;
115 }
116
117
118 public ChannelImpl getChild(ChannelId id)
119 {
120 String next=id.getSegment(_id.depth());
121 if (next==null)
122 return null;
123
124 ChannelImpl channel = _children.get(next);
125
126 if (channel==null || channel.getChannelId().depth()==id.depth())
127 {
128 return channel;
129 }
130 return channel.getChild(id);
131 }
132
133
134 public void getChannels(List<Channel> list)
135 {
136 list.add(this);
137 for (ChannelImpl channel: _children.values())
138 channel.getChannels(list);
139 }
140
141
142 public int getChannelCount()
143 {
144 int count = 1;
145
146 for(ChannelImpl channel: _children.values())
147 count += channel.getChannelCount();
148
149 return count;
150 }
151
152
153
154
155
156 public String getId()
157 {
158 return _id.toString();
159 }
160
161
162
163 public boolean isPersistent()
164 {
165 return _persistent;
166 }
167
168
169 public void publish(Client fromClient, Object data, String msgId)
170 {
171 _bayeux.doPublish(getChannelId(),fromClient,data,msgId);
172 }
173
174
175 public boolean remove()
176 {
177 return _bayeux.removeChannel(this);
178 }
179
180
181 public boolean doRemove(ChannelImpl channel)
182 {
183 ChannelId channelId = channel.getChannelId();
184 String key = channelId.getSegment(channelId.depth()-1);
185 if (_children.containsKey(key))
186 {
187 ChannelImpl child = _children.get(key);
188
189 synchronized (this)
190 {
191 synchronized (child)
192 {
193 if (!child.isPersistent() && child.getSubscriberCount()==0 && child.getChannelCount()==1)
194 {
195 _children.remove(key);
196 return true;
197 }
198 else
199 return false;
200 }
201
202 }
203 }
204 else
205 {
206 for (ChannelImpl child : _children.values())
207 {
208 if (child.doRemove(channel))
209 return true;
210 }
211 }
212 return false;
213 }
214
215
216
217
218
219
220 public DataFilter removeDataFilter(DataFilter filter)
221 {
222 synchronized(this)
223 {
224 _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
225 return filter;
226 }
227 }
228
229
230 public void setPersistent(boolean persistent)
231 {
232 _persistent=persistent;
233 }
234
235
236
237
238
239 public void subscribe(Client client)
240 {
241 if (!(client instanceof ClientImpl))
242 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
243
244 synchronized (this)
245 {
246 for (ClientImpl c : _subscribers)
247 {
248 if (client.equals(c))
249 return;
250 }
251 _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
252
253 for (SubscriptionListener l : _subscriptionListeners)
254 l.subscribed(client, this);
255 }
256
257 ((ClientImpl)client).addSubscription(this);
258 }
259
260
261 @Override
262 public String toString()
263 {
264 return _id.toString();
265 }
266
267
268
269
270
271 public void unsubscribe(Client client)
272 {
273 if (!(client instanceof ClientImpl))
274 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
275 ((ClientImpl)client).removeSubscription(this);
276 synchronized(this)
277 {
278 _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
279
280 for (SubscriptionListener l : _subscriptionListeners)
281 l.unsubscribed(client,this);
282
283 if (!_persistent && _subscribers.length==0 && _children.size()==0)
284 remove();
285 }
286 }
287
288
289 protected void doDelivery(ChannelId to, Client from, Message msg)
290 {
291 int tail = to.depth()-_id.depth();
292
293 Object data = msg.getData();
294 Object old = data;
295
296 DataFilter[] filters=null;
297
298 try
299 {
300 switch(tail)
301 {
302 case 0:
303 {
304 synchronized(this)
305 {
306 filters=_dataFilters;
307 }
308 for (DataFilter filter: filters)
309 data=filter.filter(from,this,data);
310 }
311 break;
312
313 case 1:
314 if (_wild!=null)
315 {
316 synchronized(_wild)
317 {
318 filters=_wild._dataFilters;
319 }
320 for (DataFilter filter: filters)
321 data=filter.filter(from,this,data);
322 }
323
324 default:
325 if (_wildWild!=null)
326 {
327 synchronized(_wildWild)
328 {
329 filters=_wildWild._dataFilters;
330 }
331 for (DataFilter filter: filters)
332 {
333 data=filter.filter(from,this,data);
334 }
335 }
336 }
337 }
338 catch (IllegalStateException e)
339 {
340 Log.debug(e);
341 return;
342 }
343 if (data!=old)
344 msg.put(AbstractBayeux.DATA_FIELD,data);
345
346 ClientImpl[] subscribers;
347
348 switch(tail)
349 {
350 case 0:
351 synchronized (this)
352 {
353 subscribers=_subscribers;
354 _split++;
355 }
356 if (subscribers.length>0)
357 {
358
359 int split=_split%_subscribers.length;
360 for (int i=split;i<subscribers.length;i++)
361 subscribers[i].doDelivery(from,msg);
362 for (int i=0;i<split;i++)
363 subscribers[i].doDelivery(from,msg);
364 }
365 break;
366
367 case 1:
368 if (_wild!=null)
369 {
370 synchronized (_wild)
371 {
372 subscribers=_wild._subscribers;
373 }
374 for (ClientImpl client: subscribers)
375 {
376 client.doDelivery(from,msg);
377 }
378 }
379
380 default:
381 {
382 if (_wildWild!=null)
383 {
384 synchronized (_wildWild)
385 {
386 subscribers=_wildWild._subscribers;
387 }
388 for (ClientImpl client: subscribers)
389 {
390 client.doDelivery(from,msg);
391 }
392 }
393 String next = to.getSegment(_id.depth());
394 ChannelImpl channel = _children.get(next);
395 if (channel!=null)
396 channel.doDelivery(to,from,msg);
397 }
398 }
399 }
400
401
402 public Collection<Client> getSubscribers()
403 {
404 synchronized(this)
405 {
406 return Arrays.asList((Client[])_subscribers);
407 }
408 }
409
410
411 public int getSubscriberCount()
412 {
413 synchronized(this)
414 {
415 return _subscribers.length;
416 }
417 }
418
419
420
421
422
423
424 public Collection<DataFilter> getDataFilters()
425 {
426 synchronized(this)
427 {
428 return Arrays.asList(_dataFilters);
429 }
430 }
431
432
433 public void addListener(ChannelListener listener)
434 {
435 synchronized(this)
436 {
437 if (listener instanceof SubscriptionListener)
438 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
439 }
440 }
441
442 }