1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Queue;
20
21 import org.cometd.Bayeux;
22 import org.cometd.Channel;
23 import org.cometd.Client;
24 import org.cometd.ClientListener;
25 import org.cometd.DeliverListener;
26 import org.cometd.Extension;
27 import org.cometd.Message;
28 import org.cometd.MessageListener;
29 import org.cometd.QueueListener;
30 import org.cometd.RemoveListener;
31 import org.mortbay.log.Log;
32 import org.mortbay.util.ArrayQueue;
33 import org.mortbay.util.LazyList;
34 import org.mortbay.util.ajax.JSON;
35
36
37
38
39
40
41 public class ClientImpl implements Client
42 {
43 private String _id;
44 private String _type;
45 private int _responsesPending;
46 private ChannelImpl[] _subscriptions=new ChannelImpl[0];
47 private RemoveListener[] _rListeners;
48 private MessageListener[] _syncMListeners;
49 private MessageListener[] _asyncMListeners;
50 private QueueListener[] _qListeners;
51 private DeliverListener[] _dListeners;
52 protected AbstractBayeux _bayeux;
53 private String _browserId;
54 private JSON.Literal _advice;
55 private int _batch;
56 private int _maxQueue;
57 private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
58 private long _timeout = -1;
59 private long _interval = -1;
60 private int _lag;
61 private Extension[] _extensions;
62
63 private boolean _deliverViaMetaConnectOnly;
64 private volatile boolean _isExpired;
65
66
67 int _adviseVersion;
68
69
70 protected ClientImpl(AbstractBayeux bayeux)
71 {
72 _bayeux=bayeux;
73 _maxQueue=bayeux.getMaxClientQueue();
74 }
75
76
77 protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
78 {
79 _bayeux=bayeux;
80 _maxQueue=0;
81 }
82
83
84 public void addExtension(Extension ext)
85 {
86 _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
87 }
88
89 public void removeExtension(Extension ext)
90 {
91 _extensions=(Extension[])LazyList.removeFromArray(_extensions,ext);
92 }
93
94
95 Extension[] getExtensions()
96 {
97 return _extensions;
98 }
99
100
101 public void deliver(Client from, String toChannel, Object data, String id)
102 {
103 MessageImpl message=_bayeux.newMessage();
104 message.put(Bayeux.CHANNEL_FIELD,toChannel);
105 message.put(Bayeux.DATA_FIELD,data);
106 if (id != null)
107 message.put(Bayeux.ID_FIELD,id);
108
109 Message m=_bayeux.extendSendBayeux(from,message);
110 if (m != null)
111 doDelivery(from,m);
112 if (m instanceof MessageImpl)
113 ((MessageImpl)m).decRef();
114 }
115
116
117 public void deliverLazy(Client from, String toChannel, Object data, String id)
118 {
119 MessageImpl message=_bayeux.newMessage();
120 message.put(Bayeux.CHANNEL_FIELD,toChannel);
121 message.put(Bayeux.DATA_FIELD,data);
122 if (id != null)
123 message.put(Bayeux.ID_FIELD,id);
124 message.setLazy(true);
125 Message m=_bayeux.extendSendBayeux(from,message);
126 if (m != null)
127 doDelivery(from,m);
128 if (m instanceof MessageImpl)
129 ((MessageImpl)m).decRef();
130 }
131
132
133 protected void doDelivery(Client from, final Message msg)
134 {
135 final Message message=_bayeux.extendSendClient(from,this,msg);
136 if (message == null)
137 return;
138
139 MessageListener[] alisteners=null;
140 synchronized(this)
141 {
142 if (_maxQueue < 0)
143 {
144
145 ((MessageImpl)message).incRef();
146 _queue.addUnsafe(message);
147 }
148 else
149 {
150
151 boolean queue;
152 if (_queue.size() >= _maxQueue)
153 {
154
155 if (_qListeners != null && _qListeners.length > 0)
156 {
157 queue=true;
158 for (QueueListener l : _qListeners)
159 queue &= notifyQueueListener(l, from, message);
160 }
161 else
162 queue=false;
163 }
164 else
165
166 queue=true;
167
168
169 if (queue)
170 {
171 ((MessageImpl)message).incRef();
172 _queue.addUnsafe(message);
173 }
174 }
175
176
177 if (_syncMListeners != null)
178 for (MessageListener l : _syncMListeners)
179 notifyMessageListener(l, from, message);
180 alisteners=_asyncMListeners;
181
182 if (_batch == 0 && _responsesPending < 1 && _queue.size() > 0)
183 {
184 if (((MessageImpl)message).isLazy())
185 lazyResume();
186 else
187 resume();
188 }
189 }
190
191
192 if (alisteners != null)
193 for (MessageListener l : alisteners)
194 notifyMessageListener(l, from, message);
195 }
196
197 private boolean notifyQueueListener(QueueListener listener, Client from, Message message)
198 {
199 try
200 {
201 return listener.queueMaxed(from, this, message);
202 }
203 catch (Throwable x)
204 {
205 Log.warn(x);
206 return false;
207 }
208 }
209
210 private void notifyMessageListener(MessageListener listener, Client from, Message message)
211 {
212 try
213 {
214 listener.deliver(from, this, message);
215 }
216 catch (Throwable x)
217 {
218 Log.warn(x);
219 }
220 }
221
222
223 public void doDeliverListeners()
224 {
225 synchronized(this)
226 {
227 if (_dListeners != null)
228 for (DeliverListener l : _dListeners)
229 notifyDeliverListener(l, _queue);
230 }
231 }
232
233 private void notifyDeliverListener(DeliverListener listener, Queue<Message> queue)
234 {
235 try
236 {
237 listener.deliver(this, queue);
238 }
239 catch (Throwable x)
240 {
241 Log.warn(x);
242 }
243 }
244
245
246 public void setMetaConnectDeliveryOnly(boolean deliverViaMetaConnectOnly)
247 {
248 _deliverViaMetaConnectOnly=deliverViaMetaConnectOnly;
249 }
250
251
252 public boolean isMetaConnectDeliveryOnly()
253 {
254 return _deliverViaMetaConnectOnly;
255 }
256
257
258 public void startBatch()
259 {
260 synchronized(this)
261 {
262 _batch++;
263 }
264 }
265
266
267 public void endBatch()
268 {
269 synchronized(this)
270 {
271 if (--_batch == 0 && _responsesPending < 1)
272 {
273 batch:switch(_queue.size())
274 {
275 case 0:
276 break;
277 case 1:
278 if (((MessageImpl)_queue.get(0)).isLazy())
279 lazyResume();
280 else
281 resume();
282 break;
283 default:
284 for (int i=_queue.size();i-->0;)
285 {
286 if (!((MessageImpl)_queue.get(i)).isLazy())
287 {
288 resume();
289 break batch;
290 }
291 }
292 lazyResume();
293 }
294 }
295 }
296 }
297
298
299 public String getConnectionType()
300 {
301 return _type;
302 }
303
304
305
306
307
308
309
310 public String getId()
311 {
312 return _id;
313 }
314
315
316 public boolean hasMessages()
317 {
318 return _queue.size() > 0;
319 }
320
321
322 public boolean hasNonLazyMessages()
323 {
324 synchronized(this)
325 {
326 for (int i=_queue.size(); i-- > 0;)
327 {
328 if (!((MessageImpl)_queue.getUnsafe(i)).isLazy())
329 return true;
330 }
331 }
332 return false;
333 }
334
335
336 public boolean isLocal()
337 {
338 return true;
339 }
340
341
342
343
344
345
346
347 public void disconnect()
348 {
349 synchronized(this)
350 {
351 if (_bayeux.hasClient(_id))
352 remove(false);
353 }
354 }
355
356
357
358
359
360
361
362 public void remove(boolean timeout)
363 {
364 _isExpired=timeout;
365 Client client=_bayeux.removeClient(_id);
366
367 if (client != null && _bayeux.isLogInfo())
368 _bayeux.logInfo("Remove client " + client + " timeout=" + timeout);
369
370 final String browser_id;
371 final RemoveListener[] listeners;
372 synchronized(this)
373 {
374 browser_id=_browserId;
375 _browserId=null;
376 listeners=_rListeners;
377 }
378
379 if (browser_id != null)
380 _bayeux.clientOffBrowser(browser_id,_id);
381 if (listeners != null)
382 for (RemoveListener l : listeners)
383 notifyRemoveListener(l, _id, timeout);
384
385 resume();
386 }
387
388 private void notifyRemoveListener(RemoveListener listener, String clientId, boolean timeout)
389 {
390 try
391 {
392 listener.removed(clientId, timeout);
393 }
394 catch (Throwable x)
395 {
396 Log.warn(x);
397 }
398 }
399
400
401 public boolean isExpired()
402 {
403 return _isExpired;
404 }
405
406
407 public int responded()
408 {
409 synchronized(this)
410 {
411 return _responsesPending--;
412 }
413 }
414
415
416 public int responsePending()
417 {
418 synchronized(this)
419 {
420 return ++_responsesPending;
421 }
422 }
423
424
425
426
427
428 public void lazyResume()
429 {
430 }
431
432
433
434
435
436 public void resume()
437 {
438 }
439
440
441
442
443
444 public int getMessages()
445 {
446 return _queue.size();
447 }
448
449
450 public List<Message> takeMessages()
451 {
452 synchronized(this)
453 {
454 ArrayList<Message> list=new ArrayList<Message>(_queue);
455 _queue.clear();
456 return list;
457 }
458 }
459
460
461 public void returnMessages(List<Message> messages)
462 {
463 synchronized(this)
464 {
465 _queue.addAll(0,messages);
466 }
467 }
468
469
470 @Override
471 public String toString()
472 {
473 return _id;
474 }
475
476
477 protected void addSubscription(ChannelImpl channel)
478 {
479 synchronized(this)
480 {
481 _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
482 }
483 }
484
485
486 protected void removeSubscription(ChannelImpl channel)
487 {
488 synchronized(this)
489 {
490 _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
491 }
492 }
493
494
495 protected void setConnectionType(String type)
496 {
497 synchronized(this)
498 {
499 _type=type;
500 }
501 }
502
503
504 protected void setId(String id)
505 {
506 synchronized(this)
507 {
508 _id=id;
509 }
510 }
511
512
513 public void unsubscribeAll()
514 {
515 ChannelImpl[] subscriptions;
516 synchronized(this)
517 {
518 subscriptions=_subscriptions;
519 _subscriptions=new ChannelImpl[0];
520 }
521 for (ChannelImpl channel : subscriptions)
522 channel.unsubscribe(this);
523
524 }
525
526
527 public void setBrowserId(String id)
528 {
529 if (_browserId != null && !_browserId.equals(id))
530 _bayeux.clientOffBrowser(_browserId,_id);
531 _browserId=id;
532 if (_browserId != null)
533 _bayeux.clientOnBrowser(_browserId,_id);
534 }
535
536
537 public String getBrowserId()
538 {
539 return _browserId;
540 }
541
542
543 @Override
544 public boolean equals(Object o)
545 {
546 if (!(o instanceof Client))
547 return false;
548 return getId().equals(((Client)o).getId());
549 }
550
551
552
553
554
555
556
557 public JSON.Literal getAdvice()
558 {
559 return _advice;
560 }
561
562
563
564
565
566
567 public void setAdvice(JSON.Literal advice)
568 {
569 _advice=advice;
570 }
571
572
573 public void addListener(ClientListener listener)
574 {
575 synchronized(this)
576 {
577 if (listener instanceof MessageListener)
578 {
579 if (listener instanceof MessageListener.Synchronous)
580 _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
581 else
582 _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
583 }
584
585 if (listener instanceof RemoveListener)
586 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
587
588 if (listener instanceof QueueListener)
589 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
590
591 if (listener instanceof DeliverListener)
592 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
593 }
594 }
595
596
597 public void removeListener(ClientListener listener)
598 {
599 synchronized(this)
600 {
601 if (listener instanceof MessageListener)
602 {
603 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
604 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
605 }
606
607 if (listener instanceof RemoveListener)
608 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
609
610 if (listener instanceof QueueListener)
611 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
612 }
613 }
614
615
616 public long getInterval()
617 {
618 return _interval;
619 }
620
621
622
623
624
625
626
627
628
629 public void setInterval(long intervalMS)
630 {
631 _interval=intervalMS;
632 }
633
634
635 public long getTimeout()
636 {
637 return _timeout;
638 }
639
640
641
642
643
644
645
646
647
648 public void setTimeout(long timeoutMS)
649 {
650 _timeout=timeoutMS;
651 }
652
653
654 public void setMaxQueue(int maxQueue)
655 {
656 _maxQueue=maxQueue;
657 }
658
659
660 public int getMaxQueue()
661 {
662 return _maxQueue;
663 }
664
665
666 public Queue<Message> getQueue()
667 {
668 return _queue;
669 }
670
671
672
673
674
675
676
677 public int getLag()
678 {
679 return _lag;
680 }
681
682
683
684
685
686
687
688 public void setLag(int lag)
689 {
690 _lag=lag;
691 }
692
693
694
695
696
697
698
699 public Channel[] getSubscriptions()
700 {
701 ChannelImpl[] subscriptions=_subscriptions;
702 if (subscriptions == null)
703 return null;
704 Channel[] channels=new Channel[subscriptions.length];
705 System.arraycopy(subscriptions,0,channels,0,subscriptions.length);
706 return channels;
707 }
708
709 }