1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Queue;
21
22 import org.cometd.Bayeux;
23 import org.cometd.Client;
24 import org.cometd.DeliverListener;
25 import org.cometd.Extension;
26 import org.cometd.ClientListener;
27 import org.cometd.Message;
28 import org.cometd.MessageListener;
29 import org.cometd.QueueListener;
30 import org.cometd.RemoveListener;
31 import org.mortbay.util.ArrayQueue;
32 import org.mortbay.util.LazyList;
33 import org.mortbay.util.ajax.JSON;
34
35
36
37
38
39
40
41
42 public class ClientImpl implements Client
43 {
44 private String _id;
45 private String _type;
46 private int _responsesPending;
47 private ChannelImpl[] _subscriptions=new ChannelImpl[0];
48 private boolean _JSONCommented;
49 private RemoveListener[] _rListeners;
50 private MessageListener[] _syncMListeners;
51 private MessageListener[] _asyncMListeners;
52 private QueueListener[] _qListeners;
53 private DeliverListener[] _dListeners;
54 protected AbstractBayeux _bayeux;
55 private String _browserId;
56 private JSON.Literal _advice;
57 private int _batch;
58 private int _maxQueue;
59 private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
60 private long _timeout;
61
62
63 int _adviseVersion;
64
65
66 protected ClientImpl(AbstractBayeux bayeux)
67 {
68 _bayeux=bayeux;
69 _maxQueue=bayeux.getMaxClientQueue();
70 _bayeux.addClient(this,null);
71 if (_bayeux.isLogInfo())
72 _bayeux.logInfo("newClient: "+this);
73 }
74
75
76 protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
77 {
78 _bayeux=bayeux;
79 _maxQueue=0;
80
81 _bayeux.addClient(this,idPrefix);
82
83 if (_bayeux.isLogInfo())
84 _bayeux.logInfo("newClient: "+this);
85
86 }
87
88
89 public void deliver(Client from, String toChannel, Object data, String id)
90 {
91
92 Message message=_bayeux.newMessage();
93 message.put(Bayeux.CHANNEL_FIELD,toChannel);
94 message.put(Bayeux.DATA_FIELD,data);
95 if (id!=null)
96 message.put(Bayeux.ID_FIELD,id);
97
98 for (Extension e:_bayeux._extensions)
99 message=e.send(message);
100 doDelivery(from,message);
101
102 ((MessageImpl)message).decRef();
103 }
104
105
106 protected void doDelivery(Client from, Message message)
107 {
108 MessageListener[] alisteners=null;
109 synchronized(this)
110 {
111 ((MessageImpl)message).incRef();
112
113 if (_maxQueue<0)
114 {
115 _queue.addUnsafe(message);
116 }
117 else
118 {
119 boolean add=_maxQueue>0;
120 if (_queue.size()>=_maxQueue && _qListeners!=null)
121 {
122 for (QueueListener l : _qListeners)
123 {
124 add &= l.queueMaxed((Client)this,message);
125 }
126 }
127
128 if (add)
129 _queue.addUnsafe(message);
130 }
131
132
133 if (_syncMListeners!=null)
134 for (MessageListener l:_syncMListeners)
135 l.deliver(from,this,message);
136 alisteners=_asyncMListeners;
137
138 if (_batch==0 && _responsesPending<1 && _queue.size()>0)
139 resume();
140 }
141
142
143 if (alisteners!=null)
144 for (MessageListener l:alisteners)
145 l.deliver(from,this,message);
146 }
147
148
149 public void doDeliverListeners()
150 {
151 synchronized (this)
152 {
153 if (_dListeners!=null)
154 for (DeliverListener l:_dListeners)
155 l.deliver(this,_queue);
156 }
157 }
158
159
160
161 public void startBatch()
162 {
163 synchronized(this)
164 {
165 _batch++;
166 }
167 }
168
169
170 public void endBatch()
171 {
172 synchronized(this)
173 {
174 if (--_batch==0 && _queue.size()>0 && _responsesPending<1)
175 resume();
176 }
177 }
178
179
180 public String getConnectionType()
181 {
182 return _type;
183 }
184
185
186
187
188
189 public String getId()
190 {
191 return _id;
192 }
193
194
195 public boolean hasMessages()
196 {
197 return _queue.size()>0;
198 }
199
200
201
202
203
204 public boolean isJSONCommented()
205 {
206 synchronized(this)
207 {
208 return _JSONCommented;
209 }
210 }
211
212
213 public boolean isLocal()
214 {
215 return true;
216 }
217
218
219
220
221
222
223 public void remove(boolean timeout)
224 {
225 synchronized(this)
226 {
227 Client client=_bayeux.removeClient(_id);
228 if (_bayeux.isLogInfo())
229 _bayeux.logInfo("Remove client "+client+" timeout="+timeout);
230 if (_browserId!=null)
231 _bayeux.clientOffBrowser(getBrowserId(),_id);
232 _browserId=null;
233
234 if (_rListeners!=null)
235 for (RemoveListener l:_rListeners)
236 l.removed(_id, timeout);
237 }
238 resume();
239 }
240
241
242 public int responded()
243 {
244 synchronized(this)
245 {
246 return _responsesPending--;
247 }
248 }
249
250
251 public int responsePending()
252 {
253 synchronized(this)
254 {
255 return ++_responsesPending;
256 }
257 }
258
259
260
261
262 public void resume()
263 {
264 }
265
266
267
268
269
270 public void setJSONCommented(boolean commented)
271 {
272 synchronized(this)
273 {
274 _JSONCommented=commented;
275 }
276 }
277
278
279
280
281
282 public int getMessages()
283 {
284 return _queue.size();
285 }
286
287
288 public List<Message> takeMessages()
289 {
290 synchronized(this)
291 {
292 ArrayList<Message> list = new ArrayList<Message>(_queue);
293 _queue.clear();
294 return list;
295 }
296 }
297
298
299
300 public void returnMessages(List<Message> messages)
301 {
302 synchronized(this)
303 {
304 _queue.addAll(0,messages);
305 }
306 }
307
308
309 @Override
310 public String toString()
311 {
312 return _id;
313 }
314
315
316 protected void addSubscription(ChannelImpl channel)
317 {
318 synchronized (this)
319 {
320 _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
321 }
322 }
323
324
325 protected void removeSubscription(ChannelImpl channel)
326 {
327 synchronized (this)
328 {
329 _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
330 }
331 }
332
333
334 protected void setConnectionType(String type)
335 {
336 synchronized (this)
337 {
338 _type=type;
339 }
340 }
341
342
343 protected void setId(String _id)
344 {
345 synchronized (this)
346 {
347 this._id=_id;
348 }
349 }
350
351
352 protected void unsubscribeAll()
353 {
354 ChannelImpl[] subscriptions;
355 synchronized(this)
356 {
357 _queue.clear();
358 subscriptions=_subscriptions;
359 _subscriptions=new ChannelImpl[0];
360 }
361 for (ChannelImpl channel : subscriptions)
362 channel.unsubscribe(this);
363
364 }
365
366
367 public void setBrowserId(String id)
368 {
369 if (_browserId!=null && !_browserId.equals(id))
370 _bayeux.clientOffBrowser(_browserId,_id);
371 _browserId=id;
372 if (_browserId!=null)
373 _bayeux.clientOnBrowser(_browserId,_id);
374 }
375
376
377 public String getBrowserId()
378 {
379 return _browserId;
380 }
381
382
383 @Override
384 public boolean equals(Object o)
385 {
386 if (!(o instanceof Client))
387 return false;
388 return getId().equals(((Client)o).getId());
389 }
390
391
392
393
394
395
396 public JSON.Literal getAdvice()
397 {
398 return _advice;
399 }
400
401
402
403
404
405 public void setAdvice(JSON.Literal advice)
406 {
407 _advice=advice;
408 }
409
410
411
412 public void addListener(ClientListener listener)
413 {
414 synchronized(this)
415 {
416 if (listener instanceof MessageListener)
417 {
418 if (listener instanceof MessageListener.Synchronous)
419 _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
420 else
421 _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
422 }
423
424 if (listener instanceof RemoveListener)
425 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
426
427 if (listener instanceof QueueListener)
428 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
429
430 if (listener instanceof DeliverListener)
431 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
432 }
433 }
434
435
436 public void removeListener(ClientListener listener)
437 {
438 synchronized(this)
439 {
440 if (listener instanceof MessageListener)
441 {
442 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
443 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
444 }
445
446 if (listener instanceof RemoveListener)
447 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
448
449 if (listener instanceof QueueListener)
450 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
451 }
452 }
453
454
455 public long getTimeout()
456 {
457 return _timeout;
458 }
459
460
461 public void setTimeout(long timeoutMS)
462 {
463 _timeout=timeoutMS;
464 }
465
466
467 public void setMaxQueue(int maxQueue)
468 {
469 _maxQueue=maxQueue;
470 }
471
472
473 public int getMaxQueue()
474 {
475 return _maxQueue;
476 }
477
478
479 public Queue<Message> getQueue()
480 {
481 return _queue;
482 }
483 }