View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.ArrayList;
18  import java.util.List;
19  import java.util.Queue;
20  
21  import org.cometd.Bayeux;
22  import org.cometd.Channel;
23  import org.cometd.Client;
24  import org.cometd.ClientListener;
25  import org.cometd.DeliverListener;
26  import org.cometd.Extension;
27  import org.cometd.Message;
28  import org.cometd.MessageListener;
29  import org.cometd.QueueListener;
30  import org.cometd.RemoveListener;
31  import org.mortbay.log.Log;
32  import org.mortbay.util.ArrayQueue;
33  import org.mortbay.util.LazyList;
34  import org.mortbay.util.ajax.JSON;
35  
36  /* ------------------------------------------------------------ */
37  /**
38   *
39   * @author gregw
40   */
41  public class ClientImpl implements Client
42  {
43      private String _id;
44      private String _type;
45      private int _responsesPending;
46      private ChannelImpl[] _subscriptions=new ChannelImpl[0]; // copy on write
47      private RemoveListener[] _rListeners; // copy on write
48      private MessageListener[] _syncMListeners; // copy on write
49      private MessageListener[] _asyncMListeners; // copy on write
50      private QueueListener[] _qListeners; // copy on write
51      private DeliverListener[] _dListeners; // copy on write
52      protected AbstractBayeux _bayeux;
53      private String _browserId;
54      private JSON.Literal _advice;
55      private int _batch;
56      private int _maxQueue;
57      private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
58      private long _timeout = -1;
59      private long _interval = -1;
60      private int _lag;
61      private Extension[] _extensions;
62  
63      private boolean _deliverViaMetaConnectOnly;
64      private volatile boolean _isExpired;
65  
66      // manipulated and synchronized by AbstractBayeux
67      int _adviseVersion;
68  
69      /* ------------------------------------------------------------ */
70      protected ClientImpl(AbstractBayeux bayeux)
71      {
72          _bayeux=bayeux;
73          _maxQueue=bayeux.getMaxClientQueue();
74      }
75  
76      /* ------------------------------------------------------------ */
77      protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
78      {
79          _bayeux=bayeux;
80          _maxQueue=0;
81      }
82  
83      /* ------------------------------------------------------------ */
84      public void addExtension(Extension ext)
85      {
86          _extensions=(Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
87      }
88  
89      public void removeExtension(Extension ext)
90      {
91          _extensions=(Extension[])LazyList.removeFromArray(_extensions,ext);
92      }
93  
94      /* ------------------------------------------------------------ */
95      Extension[] getExtensions()
96      {
97          return _extensions;
98      }
99  
100     /* ------------------------------------------------------------ */
101     public void deliver(Client from, String toChannel, Object data, String id)
102     {
103         MessageImpl message=_bayeux.newMessage();
104         message.put(Bayeux.CHANNEL_FIELD,toChannel);
105         message.put(Bayeux.DATA_FIELD,data);
106         if (id != null)
107             message.put(Bayeux.ID_FIELD,id);
108 
109         Message m=_bayeux.extendSendBayeux(from,message);
110         if (m != null)
111             doDelivery(from,m);
112         if (m instanceof MessageImpl)
113             ((MessageImpl)m).decRef();
114     }
115 
116     /* ------------------------------------------------------------ */
117     public void deliverLazy(Client from, String toChannel, Object data, String id)
118     {
119         MessageImpl message=_bayeux.newMessage();
120         message.put(Bayeux.CHANNEL_FIELD,toChannel);
121         message.put(Bayeux.DATA_FIELD,data);
122         if (id != null)
123             message.put(Bayeux.ID_FIELD,id);
124         message.setLazy(true);
125         Message m=_bayeux.extendSendBayeux(from,message);
126         if (m != null)
127             doDelivery(from,m);
128         if (m instanceof MessageImpl)
129             ((MessageImpl)m).decRef();
130     }
131 
132     /* ------------------------------------------------------------ */
133     protected void doDelivery(Client from, final Message msg)
134     {
135         final Message message=_bayeux.extendSendClient(from,this,msg);
136         if (message == null)
137             return;
138 
139         MessageListener[] alisteners=null;
140         synchronized(this)
141         {
142             if (_maxQueue < 0)
143             {
144                 // No queue limit, so always queue the message
145                 ((MessageImpl)message).incRef();
146                 _queue.addUnsafe(message);
147             }
148             else
149             {
150                 // We have a queue limit,
151                 boolean queue;
152                 if (_queue.size() >= _maxQueue)
153                 {
154                     // We are over the limit, so consult listeners
155                     if (_qListeners != null && _qListeners.length > 0)
156                     {
157                         queue=true;
158                         for (QueueListener l : _qListeners)
159                             queue &= notifyQueueListener(l, from, message);
160                     }
161                     else
162                         queue=false;
163                 }
164                 else
165                     // we are under limit, so queue the messages.
166                     queue=true;
167 
168                 // queue the message if we are meant to
169                 if (queue)
170                 {
171                     ((MessageImpl)message).incRef();
172                     _queue.addUnsafe(message);
173                 }
174             }
175 
176             // deliver synchronized
177             if (_syncMListeners != null)
178                 for (MessageListener l : _syncMListeners)
179                     notifyMessageListener(l, from, message);
180             alisteners=_asyncMListeners;
181 
182             if (_batch == 0 && _responsesPending < 1 && _queue.size() > 0)
183             {
184                 if (((MessageImpl)message).isLazy())
185                     lazyResume();
186                 else
187                     resume();
188             }
189         }
190 
191         // deliver unsynchronized
192         if (alisteners != null)
193             for (MessageListener l : alisteners)
194                 notifyMessageListener(l, from, message);
195     }
196 
197     private boolean notifyQueueListener(QueueListener listener, Client from, Message message)
198     {
199         try
200         {
201             return listener.queueMaxed(from, this, message);
202         }
203         catch (Throwable x)
204         {
205             Log.warn(x);
206             return false;
207         }
208     }
209 
210     private void notifyMessageListener(MessageListener listener, Client from, Message message)
211     {
212         try
213         {
214             listener.deliver(from, this, message);
215         }
216         catch (Throwable x)
217         {
218             Log.warn(x);
219         }
220     }
221 
222     /* ------------------------------------------------------------ */
223     public void doDeliverListeners()
224     {
225         synchronized(this)
226         {
227             if (_dListeners != null)
228                 for (DeliverListener l : _dListeners)
229                     notifyDeliverListener(l, _queue);
230         }
231     }
232 
233     private void notifyDeliverListener(DeliverListener listener, Queue<Message> queue)
234     {
235         try
236         {
237             listener.deliver(this, queue);
238         }
239         catch (Throwable x)
240         {
241             Log.warn(x);
242         }
243     }
244 
245     /* ------------------------------------------------------------ */
246     public void setMetaConnectDeliveryOnly(boolean deliverViaMetaConnectOnly)
247     {
248         _deliverViaMetaConnectOnly=deliverViaMetaConnectOnly;
249     }
250 
251     /* ------------------------------------------------------------ */
252     public boolean isMetaConnectDeliveryOnly()
253     {
254         return _deliverViaMetaConnectOnly;
255     }
256 
257     /* ------------------------------------------------------------ */
258     public void startBatch()
259     {
260         synchronized(this)
261         {
262             _batch++;
263         }
264     }
265 
266     /* ------------------------------------------------------------ */
267     public void endBatch()
268     {
269         synchronized(this)
270         {
271             if (--_batch == 0 && _responsesPending < 1)
272             {
273                 batch:switch(_queue.size())
274                 {
275                     case 0:
276                         break;
277                     case 1:
278                         if (((MessageImpl)_queue.get(0)).isLazy())
279                             lazyResume();
280                         else
281                             resume();
282                         break;
283                     default:
284                         for (int i=_queue.size();i-->0;)
285                         {
286                             if (!((MessageImpl)_queue.get(i)).isLazy())
287                             {
288                                 resume();
289                                 break batch;
290                             }
291                         }
292                         lazyResume();
293                 }
294             }
295         }
296     }
297 
298     /* ------------------------------------------------------------ */
299     public String getConnectionType()
300     {
301         return _type;
302     }
303 
304     /* ------------------------------------------------------------ */
305     /*
306      * (non-Javadoc)
307      *
308      * @see org.mortbay.cometd.C#getId()
309      */
310     public String getId()
311     {
312         return _id;
313     }
314 
315     /* ------------------------------------------------------------ */
316     public boolean hasMessages()
317     {
318         return _queue.size() > 0;
319     }
320 
321     /* ------------------------------------------------------------ */
322     public boolean hasNonLazyMessages()
323     {
324         synchronized(this)
325         {
326             for (int i=_queue.size(); i-- > 0;)
327             {
328                 if (!((MessageImpl)_queue.getUnsafe(i)).isLazy())
329                     return true;
330             }
331         }
332         return false;
333     }
334 
335     /* ------------------------------------------------------------ */
336     public boolean isLocal()
337     {
338         return true;
339     }
340 
341     /* ------------------------------------------------------------ */
342     /*
343      * (non-Javadoc)
344      *
345      * @see org.cometd.Client#disconnect()
346      */
347     public void disconnect()
348     {
349         synchronized(this)
350         {
351             if (_bayeux.hasClient(_id))
352                 remove(false);
353         }
354     }
355 
356     /* ------------------------------------------------------------ */
357     /*
358      * (non-Javadoc)
359      *
360      * @see dojox.cometd.Client#remove(boolean)
361      */
362     public void remove(boolean timeout)
363     {
364         _isExpired=timeout;
365         Client client=_bayeux.removeClient(_id);
366 
367         if (client != null && _bayeux.isLogInfo())
368             _bayeux.logInfo("Remove client " + client + " timeout=" + timeout);
369 
370         final String browser_id;
371         final RemoveListener[] listeners;
372         synchronized(this)
373         {
374             browser_id=_browserId;
375             _browserId=null;
376             listeners=_rListeners;
377         }
378 
379         if (browser_id != null)
380             _bayeux.clientOffBrowser(browser_id,_id);
381         if (listeners != null)
382             for (RemoveListener l : listeners)
383                 notifyRemoveListener(l, _id, timeout);
384 
385         resume();
386     }
387 
388     private void notifyRemoveListener(RemoveListener listener, String clientId, boolean timeout)
389     {
390         try
391         {
392             listener.removed(clientId, timeout);
393         }
394         catch (Throwable x)
395         {
396             Log.warn(x);
397         }
398     }
399 
400     /* ------------------------------------------------------------ */
401     public boolean isExpired()
402     {
403         return _isExpired;
404     }
405 
406     /* ------------------------------------------------------------ */
407     public int responded()
408     {
409         synchronized(this)
410         {
411             return _responsesPending--;
412         }
413     }
414 
415     /* ------------------------------------------------------------ */
416     public int responsePending()
417     {
418         synchronized(this)
419         {
420             return ++_responsesPending;
421         }
422     }
423 
424     /* ------------------------------------------------------------ */
425     /**
426      * Called by deliver to resume anything waiting on this client lazily
427      */
428     public void lazyResume()
429     {
430     }
431 
432     /* ------------------------------------------------------------ */
433     /**
434      * Called by deliver to resume anything waiting on this client.
435      */
436     public void resume()
437     {
438     }
439 
440     /* ------------------------------------------------------------ */
441     /*
442      * @return the number of messages queued
443      */
444     public int getMessages()
445     {
446         return _queue.size();
447     }
448 
449     /* ------------------------------------------------------------ */
450     public List<Message> takeMessages()
451     {
452         synchronized(this)
453         {
454             ArrayList<Message> list=new ArrayList<Message>(_queue);
455             _queue.clear();
456             return list;
457         }
458     }
459 
460     /* ------------------------------------------------------------ */
461     public void returnMessages(List<Message> messages)
462     {
463         synchronized(this)
464         {
465             _queue.addAll(0,messages);
466         }
467     }
468 
469     /* ------------------------------------------------------------ */
470     @Override
471     public String toString()
472     {
473         return _id;
474     }
475 
476     /* ------------------------------------------------------------ */
477     protected void addSubscription(ChannelImpl channel)
478     {
479         synchronized(this)
480         {
481             _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
482         }
483     }
484 
485     /* ------------------------------------------------------------ */
486     protected void removeSubscription(ChannelImpl channel)
487     {
488         synchronized(this)
489         {
490             _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
491         }
492     }
493 
494     /* ------------------------------------------------------------ */
495     protected void setConnectionType(String type)
496     {
497         synchronized(this)
498         {
499             _type=type;
500         }
501     }
502 
503     /* ------------------------------------------------------------ */
504     protected void setId(String id)
505     {
506         synchronized(this)
507         {
508             _id=id;
509         }
510     }
511 
512     /* ------------------------------------------------------------ */
513     public void unsubscribeAll()
514     {
515         ChannelImpl[] subscriptions;
516         synchronized(this)
517         {
518             subscriptions=_subscriptions;
519             _subscriptions=new ChannelImpl[0];
520         }
521         for (ChannelImpl channel : subscriptions)
522             channel.unsubscribe(this);
523 
524     }
525 
526     /* ------------------------------------------------------------ */
527     public void setBrowserId(String id)
528     {
529         if (_browserId != null && !_browserId.equals(id))
530             _bayeux.clientOffBrowser(_browserId,_id);
531         _browserId=id;
532         if (_browserId != null)
533             _bayeux.clientOnBrowser(_browserId,_id);
534     }
535 
536     /* ------------------------------------------------------------ */
537     public String getBrowserId()
538     {
539         return _browserId;
540     }
541 
542     /* ------------------------------------------------------------ */
543     @Override
544     public boolean equals(Object o)
545     {
546         if (!(o instanceof Client))
547             return false;
548         return getId().equals(((Client)o).getId());
549     }
550 
551     /* ------------------------------------------------------------ */
552     /**
553      * Get the advice specific for this Client
554      *
555      * @return advice specific for this client or null
556      */
557     public JSON.Literal getAdvice()
558     {
559         return _advice;
560     }
561 
562     /* ------------------------------------------------------------ */
563     /**
564      * @param advice
565      *            specific for this client
566      */
567     public void setAdvice(JSON.Literal advice)
568     {
569         _advice=advice;
570     }
571 
572     /* ------------------------------------------------------------ */
573     public void addListener(ClientListener listener)
574     {
575         synchronized(this)
576         {
577             if (listener instanceof MessageListener)
578             {
579                 if (listener instanceof MessageListener.Synchronous)
580                     _syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
581                 else
582                     _asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
583             }
584 
585             if (listener instanceof RemoveListener)
586                 _rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
587 
588             if (listener instanceof QueueListener)
589                 _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
590 
591             if (listener instanceof DeliverListener)
592                 _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
593         }
594     }
595 
596     /* ------------------------------------------------------------ */
597     public void removeListener(ClientListener listener)
598     {
599         synchronized(this)
600         {
601             if (listener instanceof MessageListener)
602             {
603                 _syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
604                 _asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
605             }
606 
607             if (listener instanceof RemoveListener)
608                 _rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
609 
610             if (listener instanceof QueueListener)
611                 _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
612         }
613     }
614 
615     /* ------------------------------------------------------------ */
616     public long getInterval()
617     {
618         return _interval;
619     }
620 
621     /* ------------------------------------------------------------ */
622     /**
623      * Set per client interval
624      *
625      * @param intervalMS
626      *            timeout in MS for longpoll duration or 0 to use default from
627      *            {@link AbstractBayeux#getMaxInterval()}.
628      */
629     public void setInterval(long intervalMS)
630     {
631         _interval=intervalMS;
632     }
633 
634     /* ------------------------------------------------------------ */
635     public long getTimeout()
636     {
637         return _timeout;
638     }
639 
640     /* ------------------------------------------------------------ */
641     /**
642      * Set per client timeout
643      *
644      * @param timeoutMS
645      *            timeout in MS for longpoll duration or 0 to use default from
646      *            {@link AbstractBayeux#getTimeout()}.
647      */
648     public void setTimeout(long timeoutMS)
649     {
650         _timeout=timeoutMS;
651     }
652 
653     /* ------------------------------------------------------------ */
654     public void setMaxQueue(int maxQueue)
655     {
656         _maxQueue=maxQueue;
657     }
658 
659     /* ------------------------------------------------------------ */
660     public int getMaxQueue()
661     {
662         return _maxQueue;
663     }
664 
665     /* ------------------------------------------------------------ */
666     public Queue<Message> getQueue()
667     {
668         return _queue;
669     }
670 
671     /* ------------------------------------------------------------ */
672     /**
673      * @see org.mortbay.cometd.ext.TimesyncExtension
674      * @return The lag in ms as measured by an extension like the
675      *         TimesyncExtension
676      */
677     public int getLag()
678     {
679         return _lag;
680     }
681 
682     /* ------------------------------------------------------------ */
683     /**
684      * @see org.mortbay.cometd.ext.TimesyncExtension
685      * @param lag
686      *            in ms
687      */
688     public void setLag(int lag)
689     {
690         _lag=lag;
691     }
692 
693     /* ------------------------------------------------------------ */
694     /**
695      * Get the subscribed to channels
696      *
697      * @return A copied array of the channels to which this client is subscribed
698      */
699     public Channel[] getSubscriptions()
700     {
701         ChannelImpl[] subscriptions=_subscriptions;
702         if (subscriptions == null)
703             return null;
704         Channel[] channels=new Channel[subscriptions.length];
705         System.arraycopy(subscriptions,0,channels,0,subscriptions.length);
706         return channels;
707     }
708 
709 }