View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.ArrayList;
18  import java.util.Collections;
19  import java.util.List;
20  import java.util.Queue;
21  
22  import org.cometd.Bayeux;
23  import org.cometd.Client;
24  import org.cometd.DeliverListener;
25  import org.cometd.Extension;
26  import org.cometd.ClientListener;
27  import org.cometd.Message;
28  import org.cometd.MessageListener;
29  import org.cometd.QueueListener;
30  import org.cometd.RemoveListener;
31  import org.mortbay.util.ArrayQueue;
32  import org.mortbay.util.LazyList;
33  import org.mortbay.util.ajax.JSON;
34  
35  
36  
37  /* ------------------------------------------------------------ */
38  /**
39   * 
40   * @author gregw
41   */
42  public class ClientImpl implements Client
43  {
44      private String _id;
45      private String _type;
46      private int _responsesPending;
47      private ChannelImpl[] _subscriptions=new ChannelImpl[0]; // copy on write
48      private boolean _JSONCommented;
49      private RemoveListener[] _rListeners; // copy on write
50      private MessageListener[] _syncMListeners; // copy on write
51      private MessageListener[] _asyncMListeners; // copy on write
52      private QueueListener[] _qListeners; // copy on write
53      private DeliverListener[] _dListeners; // copy on write
54      protected AbstractBayeux _bayeux;
55      private String _browserId;
56      private JSON.Literal _advice;
57      private int _batch;
58      private int _maxQueue;
59      private ArrayQueue<Message> _queue=new ArrayQueue<Message>(8,16,this);
60      private long _timeout;
61      
62      // manipulated and synchronized by AbstractBayeux
63      int _adviseVersion;
64  
65      /* ------------------------------------------------------------ */
66      protected ClientImpl(AbstractBayeux bayeux)
67      {
68          _bayeux=bayeux;
69          _maxQueue=bayeux.getMaxClientQueue();
70          _bayeux.addClient(this,null);
71          if (_bayeux.isLogInfo())
72              _bayeux.logInfo("newClient: "+this);
73      }
74      
75      /* ------------------------------------------------------------ */
76      protected ClientImpl(AbstractBayeux bayeux, String idPrefix)
77      {
78          _bayeux=bayeux;
79          _maxQueue=0;
80          
81          _bayeux.addClient(this,idPrefix);
82          
83          if (_bayeux.isLogInfo())
84              _bayeux.logInfo("newClient: "+this);
85          
86      }
87  
88      /* ------------------------------------------------------------ */
89      public void deliver(Client from, String toChannel, Object data, String id)
90      {
91          // TODO recycle maps
92          Message message=_bayeux.newMessage();
93          message.put(Bayeux.CHANNEL_FIELD,toChannel);
94          message.put(Bayeux.DATA_FIELD,data);
95          if (id!=null)   
96              message.put(Bayeux.ID_FIELD,id);
97  
98          for (Extension e:_bayeux._extensions)
99              message=e.send(message);
100         doDelivery(from,message);
101         
102         ((MessageImpl)message).decRef();
103     }
104     
105     /* ------------------------------------------------------------ */
106     protected void doDelivery(Client from, Message message)
107     {
108         MessageListener[] alisteners=null;
109         synchronized(this)
110         {
111             ((MessageImpl)message).incRef();
112             
113             if (_maxQueue<0)
114             {
115                 _queue.addUnsafe(message);
116             }
117             else
118             {
119                 boolean add=_maxQueue>0;
120                 if (_queue.size()>=_maxQueue && _qListeners!=null)
121                 {
122                     for (QueueListener l : _qListeners)
123                     {
124                         add &= l.queueMaxed((Client)this,message);
125                     }
126                 }
127                 
128                 if (add)
129                     _queue.addUnsafe(message);
130             }    
131 
132             // deliver unsynchronized
133             if (_syncMListeners!=null)
134                 for (MessageListener l:_syncMListeners)
135                     l.deliver(from,this,message);
136             alisteners=_asyncMListeners;
137              
138             if (_batch==0 &&  _responsesPending<1 && _queue.size()>0)
139                 resume();
140         }
141         
142         // deliver unsynchronized
143         if (alisteners!=null)
144             for (MessageListener l:alisteners)
145                 l.deliver(from,this,message);
146     }
147 
148     /* ------------------------------------------------------------ */
149     public void doDeliverListeners()
150     {
151         synchronized (this)
152         {
153             if (_dListeners!=null)
154                 for (DeliverListener l:_dListeners)
155                     l.deliver(this,_queue);
156         }
157     }
158 
159 
160     /* ------------------------------------------------------------ */
161     public void startBatch()
162     {
163         synchronized(this)
164         {
165             _batch++;
166         }
167     }
168     
169     /* ------------------------------------------------------------ */
170     public void endBatch()
171     {
172         synchronized(this)
173         {
174             if (--_batch==0 && _queue.size()>0 && _responsesPending<1)
175                 resume();
176         }
177     }
178     
179     /* ------------------------------------------------------------ */
180     public String getConnectionType()
181     {
182         return _type;
183     }
184     
185     /* ------------------------------------------------------------ */
186     /* (non-Javadoc)
187      * @see org.mortbay.cometd.C#getId()
188      */
189     public String getId()
190     {
191         return _id;
192     }
193    
194     /* ------------------------------------------------------------ */
195     public boolean hasMessages()
196     {
197         return _queue.size()>0;
198     }
199     
200     /* ------------------------------------------------------------ */
201     /**
202      * @return the commented
203      */
204     public boolean isJSONCommented()
205     {
206         synchronized(this)
207         {
208             return _JSONCommented;
209         }
210     }
211 
212     /* ------------------------------------------------------------ */
213     public boolean isLocal()
214     {
215         return true;
216     }
217        
218     /* ------------------------------------------------------------ */
219     /* ------------------------------------------------------------ */
220     /* (non-Javadoc)
221      * @see dojox.cometd.Client#remove(boolean)
222      */
223     public void remove(boolean timeout)
224     {
225         synchronized(this)
226         {
227             Client client=_bayeux.removeClient(_id);   
228             if (_bayeux.isLogInfo())
229                 _bayeux.logInfo("Remove client "+client+" timeout="+timeout); 
230             if (_browserId!=null)
231                 _bayeux.clientOffBrowser(getBrowserId(),_id);
232             _browserId=null;
233             
234             if (_rListeners!=null)
235                 for (RemoveListener l:_rListeners)
236                     l.removed(_id, timeout);
237         }
238         resume();
239     }
240     
241     /* ------------------------------------------------------------ */
242     public int responded()
243     {
244         synchronized(this)
245         {
246             return _responsesPending--;
247         }
248     }
249 
250     /* ------------------------------------------------------------ */
251     public int responsePending()
252     {
253         synchronized(this)
254         {
255             return ++_responsesPending;
256         }
257     }
258     
259     /* ------------------------------------------------------------ */
260     /** Called by deliver to resume anything waiting on this client.
261      */
262     public void resume()
263     {
264     }
265 
266     /* ------------------------------------------------------------ */
267     /**
268      * @param commented the commented to set
269      */
270     public void setJSONCommented(boolean commented)
271     {
272         synchronized(this)
273         {
274             _JSONCommented=commented;
275         }
276     }
277 
278     /* ------------------------------------------------------------ */
279     /*
280      * @return the number of messages queued
281      */
282     public int getMessages()
283     {
284         return _queue.size();
285     }
286     
287     /* ------------------------------------------------------------ */
288     public List<Message> takeMessages()
289     {
290         synchronized(this)
291         {
292             ArrayList<Message> list = new ArrayList<Message>(_queue);
293             _queue.clear();
294             return list;
295         }
296     }
297     
298 
299     /* ------------------------------------------------------------ */
300     public void returnMessages(List<Message> messages)
301     {
302         synchronized(this)
303         {
304             _queue.addAll(0,messages);
305         }
306     }
307         
308     /* ------------------------------------------------------------ */
309     @Override
310     public String toString()
311     {
312         return _id;
313     }
314 
315     /* ------------------------------------------------------------ */
316     protected void addSubscription(ChannelImpl channel)
317     {
318         synchronized (this)
319         {
320             _subscriptions=(ChannelImpl[])LazyList.addToArray(_subscriptions,channel,null);
321         }
322     }
323 
324     /* ------------------------------------------------------------ */
325     protected void removeSubscription(ChannelImpl channel)
326     {
327         synchronized (this)
328         {
329             _subscriptions=(ChannelImpl[])LazyList.removeFromArray(_subscriptions,channel);
330         }
331     }
332 
333     /* ------------------------------------------------------------ */
334     protected void setConnectionType(String type)
335     {
336         synchronized (this)
337         {
338             _type=type;
339         }
340     }
341 
342     /* ------------------------------------------------------------ */
343     protected void setId(String _id)
344     {
345         synchronized (this)
346         {
347             this._id=_id;
348         }
349     }
350 
351     /* ------------------------------------------------------------ */
352     protected void unsubscribeAll()
353     {
354         ChannelImpl[] subscriptions;
355         synchronized(this)
356         {
357             _queue.clear();
358             subscriptions=_subscriptions;
359             _subscriptions=new ChannelImpl[0];
360         }
361         for (ChannelImpl channel : subscriptions)
362             channel.unsubscribe(this);
363         
364     }
365 
366     /* ------------------------------------------------------------ */
367     public void setBrowserId(String id)
368     {
369         if (_browserId!=null && !_browserId.equals(id))
370             _bayeux.clientOffBrowser(_browserId,_id);
371         _browserId=id;
372         if (_browserId!=null)
373             _bayeux.clientOnBrowser(_browserId,_id);
374     }
375 
376     /* ------------------------------------------------------------ */
377     public String getBrowserId()
378     {
379         return _browserId;
380     }
381 
382     /* ------------------------------------------------------------ */
383     @Override
384     public boolean equals(Object o)
385     {
386     	if (!(o instanceof Client))
387     		return false;
388     	return getId().equals(((Client)o).getId());
389     }
390 
391     /* ------------------------------------------------------------ */
392     /**
393      * Get the advice specific for this Client
394      * @return advice specific for this client or null
395      */
396     public JSON.Literal getAdvice()
397     {
398     	return _advice;
399     }
400 
401     /* ------------------------------------------------------------ */
402     /**
403      * @param advice specific for this client
404      */
405     public void setAdvice(JSON.Literal advice)
406     {
407     	_advice=advice;
408     }
409     
410     
411     /* ------------------------------------------------------------ */
412     public void addListener(ClientListener listener)
413     {
414     	synchronized(this)
415     	{
416     		if (listener instanceof MessageListener)
417     		{
418     			if (listener instanceof MessageListener.Synchronous)
419     				_syncMListeners=(MessageListener[])LazyList.addToArray(_syncMListeners,listener,MessageListener.class);
420     			else
421     				_asyncMListeners=(MessageListener[])LazyList.addToArray(_asyncMListeners,listener,MessageListener.class);
422     		}
423 
424     		if (listener instanceof RemoveListener)
425     			_rListeners=(RemoveListener[])LazyList.addToArray(_rListeners,listener,RemoveListener.class);
426     		
427     		if (listener instanceof QueueListener)
428     		    _qListeners=(QueueListener[])LazyList.addToArray(_qListeners,listener,QueueListener.class);
429                 
430                 if (listener instanceof DeliverListener)
431                     _dListeners=(DeliverListener[])LazyList.addToArray(_dListeners,listener,DeliverListener.class);
432     	}
433     }
434 
435     /* ------------------------------------------------------------ */
436     public void removeListener(ClientListener listener)
437     {
438     	synchronized(this)
439     	{
440     		if (listener instanceof MessageListener)
441     		{
442     			_syncMListeners=(MessageListener[])LazyList.removeFromArray(_syncMListeners,listener);
443     			_asyncMListeners=(MessageListener[])LazyList.removeFromArray(_asyncMListeners,listener);
444     		}
445 
446     		if (listener instanceof RemoveListener)
447     			_rListeners=(RemoveListener[])LazyList.removeFromArray(_rListeners,listener);
448     		
449     		if (listener instanceof QueueListener)
450     		    _qListeners=(QueueListener[])LazyList.removeFromArray(_qListeners,listener);
451     	}
452     }
453 
454     /* ------------------------------------------------------------ */
455     public long getTimeout() 
456     {
457     	return _timeout;
458     }
459 
460     /* ------------------------------------------------------------ */
461     public void setTimeout(long timeoutMS) 
462     {
463     	_timeout=timeoutMS;
464     }
465 
466     /* ------------------------------------------------------------ */
467     public void setMaxQueue(int maxQueue)
468     {
469         _maxQueue=maxQueue;
470     }
471     
472     /* ------------------------------------------------------------ */
473     public int getMaxQueue()
474     {
475         return _maxQueue;
476     }
477     
478     /* ------------------------------------------------------------ */
479     public Queue<Message> getQueue()
480     {
481         return _queue;
482     }
483 }