View Javadoc

1   // ========================================================================
2   // Copyright 2008 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  package org.mortbay.cometd;
15  
16  import java.lang.reflect.Method;
17  import java.util.Map;
18  import java.util.concurrent.ConcurrentHashMap;
19  
20  import org.cometd.Bayeux;
21  import org.cometd.Channel;
22  import org.cometd.Client;
23  import org.cometd.Listener;
24  import org.cometd.Message;
25  import org.cometd.MessageListener;
26  import org.mortbay.component.LifeCycle;
27  import org.mortbay.thread.QueuedThreadPool;
28  import org.mortbay.thread.ThreadPool;
29  
30  
31  /* ------------------------------------------------------------ */
32  /** Abstract Bayeux Service class.
33   * This is a base class to assist with the creation of server side {@ link Bayeux} 
34   * clients that provide services to remote Bayeux clients.   The class provides
35   * a Bayeux {@link Client} and {@link Listener} together with convenience methods to map
36   * subscriptions to methods on the derived class and to send responses to those methods.
37   * 
38   * <p>If a {@link #set_threadPool(ThreadPool)} is set, then messages are handled in their 
39   * own threads.  This is desirable if the handling of a message can take considerable time and 
40   * it is desired not to hold up the delivering thread (typically a HTTP request handling thread).
41   * 
42   * <p>If the BayeuxService is constructed asynchronously (the default), then messages are
43   * delivered unsynchronized and multiple simultaneous calls to handling methods may occur.
44   * 
45   * <p>If the BayeuxService is constructed as a synchronous service, then message delivery
46   * is synchronized on the internal {@link Client} instances used and only a single call will
47   * be made to the handler method (unless a thread pool is used).
48   *
49   * @see MessageListener
50   * @author gregw
51   *
52   */
53  public abstract class BayeuxService 
54  {
55      private String _name;
56      private Bayeux _bayeux;
57      private Client _client;
58      private Map<String,Method> _methods = new ConcurrentHashMap<String,Method>();
59      private ThreadPool _threadPool;
60      private MessageListener _listener;
61      private boolean _seeOwn=false;
62      
63      /* ------------------------------------------------------------ */
64      /** Instantiate the service.
65       * Typically the derived constructor will call {@ #subscribe(String, String)} to 
66       * map subscriptions to methods.
67       * @param bayeux The bayeux instance.
68       * @param name The name of the service (used as client ID prefix).
69       */
70      public BayeuxService(Bayeux bayeux,String name)
71      {
72          this(bayeux,name,0,false);
73      }
74  
75      /* ------------------------------------------------------------ */
76      /** Instantiate the service.
77       * Typically the derived constructor will call {@ #subscribe(String, String)} to 
78       * map subscriptions to methods.
79       * @param bayeux The bayeux instance.
80       * @param name The name of the service (used as client ID prefix).
81       * @param maxThreads The size of a ThreadPool to create to handle messages.
82       */
83      public BayeuxService(Bayeux bayeux,String name, int maxThreads)
84      {
85          this(bayeux,name,maxThreads,false);
86      }
87      
88      /* ------------------------------------------------------------ */
89      /** Instantiate the service.
90       * Typically the derived constructor will call {@ #subscribe(String, String)} to 
91       * map subscriptions to methods.
92       * @param bayeux The bayeux instance.
93       * @param name The name of the service (used as client ID prefix).
94       * @param maxThreads The size of a ThreadPool to create to handle messages.
95       * @param synchronous True if message delivery will be synchronized on the client.
96       */
97      public BayeuxService(Bayeux bayeux,String name, int maxThreads, boolean synchronous)
98      {
99          if (maxThreads>0)
100             setThreadPool(new QueuedThreadPool(maxThreads));
101         _name=name;
102         _bayeux=bayeux;
103         _client=_bayeux.newClient(name); 
104         _listener=(synchronous)?new SyncListen():new  AsyncListen();
105         _client.addListener(_listener);
106         
107     }
108 
109     /* ------------------------------------------------------------ */
110     public Bayeux getBayeux()
111     {
112         return _bayeux;
113     }
114 
115     /* ------------------------------------------------------------ */
116     public Client getClient()
117     {
118         return _client;
119     }
120 
121     /* ------------------------------------------------------------ */
122     public ThreadPool getThreadPool()
123     {
124         return _threadPool;
125     }
126 
127     /* ------------------------------------------------------------ */
128     /**
129      * Set the threadpool.
130      * If the {@link ThreadPool} is a {@link LifeCycle}, then it is started by this method.
131      * 
132      * @param pool 
133      */
134     public void setThreadPool(ThreadPool pool)
135     {
136         try
137         {
138             if (pool instanceof LifeCycle)
139                 if (!((LifeCycle)pool).isStarted())
140                     ((LifeCycle)pool).start();
141         }
142         catch(Exception e)
143         {
144             throw new IllegalStateException(e);
145         }
146         _threadPool = pool;
147     }
148     
149     /* ------------------------------------------------------------ */
150     public boolean isSeeOwnPublishes()
151     {
152         return _seeOwn;
153     }
154 
155     /* ------------------------------------------------------------ */
156     public void setSeeOwnPublishes(boolean own)
157     {
158         _seeOwn = own;
159     }
160 
161     /* ------------------------------------------------------------ */
162     /** Subscribe to a channel.
163      * Subscribe to channel and map a method to handle received messages.
164      * The method must have a unique name and one of the following signatures:<ul>
165      * <li><code>myMethod(Client fromClient,Object data)</code></li>
166      * <li><code>myMethod(Client fromClient,Object data,String id)</code></li>
167      * <li><code>myMethod(Client fromClient,String channel,Object data,String id)</code></li>
168      * </li>
169      * 
170      * The data parameter can be typed if
171      * the type of the data object published by the client is known (typically 
172      * Map<String,Object>). If the type of the data parameter is {@link Message} then
173      * the message object itself is passed rather than just the data.
174      * <p>
175      * Typically a service will subscribe to a channel in the "/service/**" space
176      * which is not a broadcast channel.  Messages published to these channels are
177      * only delivered to server side clients like this service.  
178      * 
179      * <p>Any object returned by a mapped subscription method is delivered to the 
180      * calling client and not broadcast. If the method returns void or null, then 
181      * no response is sent. A mapped subscription method may also call {@link #send(Client, String, Object, String)}
182      * to deliver a response message(s) to different clients and/or channels. It may
183      * also publish methods via the normal {@link Bayeux} API.
184      * <p>
185      * 
186      * 
187      * @param channelId The channel to subscribe to
188      * @param methodName The name of the method on this object to call when messages are recieved.
189      */
190     protected void subscribe(String channelId,String methodName)
191     {
192         Method method=null;
193         
194         Class<?> c=this.getClass();
195         while (c!=null && c!=Object.class)
196         {
197             Method[] methods = c.getDeclaredMethods();
198             for (int i=methods.length;i-->0;)
199             {
200                 if (methodName.equals(methods[i].getName()))
201                 {
202                     if (method!=null)
203                         throw new IllegalArgumentException("Multiple methods called '"+methodName+"'");
204                     method=methods[i];
205                 }
206             }
207             c=c.getSuperclass();
208         }
209         
210         if (method==null)
211             throw new NoSuchMethodError(methodName);
212         int params=method.getParameterTypes().length;
213         if (params<2 || params>4)
214             throw new IllegalArgumentException("Method '"+methodName+"' does not have 2or3 parameters");
215         if (!Client.class.isAssignableFrom(method.getParameterTypes()[0]))
216             throw new IllegalArgumentException("Method '"+methodName+"' does not have Client as first parameter");
217 
218         Channel channel=_bayeux.getChannel(channelId,true);
219 
220         if (((ChannelImpl)channel).getChannelId().isWild())
221         { 
222             final Method m=method;
223             Client wild_client=_bayeux.newClient(_name+"-wild");
224             wild_client.addListener(_listener instanceof MessageListener.Asynchronous?new AsyncWildListen(wild_client,m):new SyncWildListen(wild_client,m));
225             channel.subscribe(wild_client);
226         }
227         else
228         {
229             _methods.put(channelId,method);
230             channel.subscribe(_client);
231         }
232     }
233 
234     /* ------------------------------------------------------------ */
235     /** Send data to a individual client.
236      * The data passed is sent to the client as the "data" member of a message
237      * with the given channel and id.  The message is not published on the channel and is
238      * thus not broadcast to all channel subscribers.  However to the target client, the
239      * message appears as if it was broadcast.
240      * <p>
241      * Typcially this method is only required if a service method sends response(s) to 
242      * channels other than the subscribed channel. If the response is to be sent to the subscribed
243      * channel, then the data can simply be returned from the subscription method.
244      * 
245      * @param toClient The target client
246      * @param onChannel The channel the message is for
247      * @param data The data of the message
248      * @param id The id of the message (or null for a random id).
249      */
250     protected void send(Client toClient, String onChannel, Object data, String id)
251     {
252         toClient.deliver(getClient(),onChannel,data,id);
253     }    
254 
255     /* ------------------------------------------------------------ */
256     /** Handle Exception.
257      * This method is called when a mapped subscription method throws
258      * and exception while handling a message.
259      * @param fromClient
260      * @param toClient
261      * @param msg
262      * @param th
263      */
264     protected void exception(Client fromClient, Client toClient, Map<String, Object> msg,Throwable th)
265     {
266         System.err.println(msg);
267         th.printStackTrace();
268     }
269 
270 
271     /* ------------------------------------------------------------ */
272     private void invoke(final Method method,final Client fromClient, final Client toClient, final Message msg)
273     {
274         if (_threadPool==null)
275             doInvoke(method,fromClient,toClient,msg);
276         else
277         {
278             _threadPool.dispatch(new Runnable()
279             {
280                 public void run()
281                 {
282                     try
283                     {
284                         ((MessageImpl)msg).incRef();
285                         doInvoke(method,fromClient,toClient,msg);
286                     }
287                     finally
288                     {
289                         ((MessageImpl)msg).decRef();
290                     }
291                 }   
292             });
293         }
294     }
295     
296     /* ------------------------------------------------------------ */
297     private void doInvoke(Method method,Client fromClient, Client toClient, Message msg)
298     {
299         String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
300         Object data=msg.get(Bayeux.DATA_FIELD);
301         String id=msg.getId();
302         
303         if (method!=null)
304         {
305             try
306             {
307                 Class<?>[] args = method.getParameterTypes();
308                 Object arg=Message.class.isAssignableFrom(args[1])?msg:data;
309                 
310                 Object reply=null;
311                 switch(method.getParameterTypes().length)
312                 {
313                     case 2:
314                         reply=method.invoke(this,fromClient,arg);
315                         break;
316                     case 3:
317                         reply=method.invoke(this,fromClient,arg,id);
318                         break;
319                     case 4:
320                         reply=method.invoke(this,fromClient,channel,arg,id);
321                         break;
322                 }
323                 
324                 if (reply!=null)
325                     send(fromClient,channel,reply,id);
326             }
327             catch (Exception e)
328             {
329                 System.err.println(method);
330                 exception(fromClient,toClient,msg,e);
331             }
332             catch (Error e)
333             {
334                 System.err.println(method);
335                 exception(fromClient,toClient,msg,e);
336             }
337         }
338     }
339 
340     /* ------------------------------------------------------------ */
341     /* ------------------------------------------------------------ */
342     private class AsyncListen implements MessageListener, MessageListener.Asynchronous
343     {
344         public void deliver(Client fromClient, Client toClient, Message msg)
345         {
346             if (!_seeOwn && fromClient==getClient())
347                 return;
348             String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
349             Method method=_methods.get(channel);
350             invoke(method,fromClient,toClient,msg);
351         }
352     }
353     
354     /* ------------------------------------------------------------ */
355     /* ------------------------------------------------------------ */
356     private class SyncListen implements MessageListener, MessageListener.Synchronous
357     {
358         public void deliver(Client fromClient, Client toClient, Message msg)
359         {
360             if (!_seeOwn && fromClient==getClient())
361                 return;
362             String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
363             Method method=_methods.get(channel);
364             invoke(method,fromClient,toClient,msg);
365         }
366     }
367     
368 
369     /* ------------------------------------------------------------ */
370     /* ------------------------------------------------------------ */
371     private class SyncWildListen implements MessageListener, MessageListener.Synchronous
372     {
373         Client _client;
374         Method _method;
375         
376         public SyncWildListen(Client client,Method method)
377         {
378             _client=client;
379             _method=method;
380         }
381         public void deliver(Client fromClient, Client toClient, Message msg)
382         {
383             if (!_seeOwn && (fromClient==_client || fromClient==getClient()))
384                 return;
385             invoke(_method,fromClient,toClient,msg);
386         }
387     };
388     
389 
390     /* ------------------------------------------------------------ */
391     /* ------------------------------------------------------------ */
392     private class AsyncWildListen implements MessageListener, MessageListener.Asynchronous
393     {
394         Client _client;
395         Method _method;
396         public AsyncWildListen(Client client,Method method)
397         {
398             _client=client;
399             _method=method;
400         }
401         public void deliver(Client fromClient, Client toClient, Message msg)
402         {
403             if (!_seeOwn && (fromClient==_client || fromClient==getClient()))
404                 return;
405             invoke(_method,fromClient,toClient,msg);
406         }
407     };
408 
409 }
410