View Javadoc

1   // ========================================================================
2   // Copyright 2008 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  package org.mortbay.cometd;
15  
16  import java.lang.reflect.Method;
17  import java.util.Map;
18  import java.util.concurrent.ConcurrentHashMap;
19  
20  import org.cometd.Bayeux;
21  import org.cometd.Channel;
22  import org.cometd.Client;
23  import org.cometd.Listener;
24  import org.cometd.Message;
25  import org.cometd.MessageListener;
26  import org.mortbay.component.LifeCycle;
27  import org.mortbay.log.Log;
28  import org.mortbay.thread.QueuedThreadPool;
29  import org.mortbay.thread.ThreadPool;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * Abstract Bayeux Service class. This is a base class to assist with the
34   * creation of server side @ link Bayeux} clients that provide services to
35   * remote Bayeux clients. The class provides a Bayeux {@link Client} and
36   * {@link Listener} together with convenience methods to map subscriptions to
37   * methods on the derived class and to send responses to those methods.
38   *
39   * <p>
40   * If a {@link #setThreadPool(ThreadPool)} is set, then messages are handled in
41   * their own threads. This is desirable if the handling of a message can take
42   * considerable time and it is desired not to hold up the delivering thread
43   * (typically a HTTP request handling thread).
44   *
45   * <p>
46   * If the BayeuxService is constructed asynchronously (the default), then
47   * messages are delivered unsynchronized and multiple simultaneous calls to
48   * handling methods may occur.
49   *
50   * <p>
51   * If the BayeuxService is constructed as a synchronous service, then message
52   * delivery is synchronized on the internal {@link Client} instances used and
53   * only a single call will be made to the handler method (unless a thread pool
54   * is used).
55   *
56   * @see MessageListener
57   * @author gregw
58   */
59  public abstract class BayeuxService
60  {
61      private String _name;
62      private Bayeux _bayeux;
63      private Client _client;
64      private Map<String,Method> _methods=new ConcurrentHashMap<String,Method>();
65      private ThreadPool _threadPool;
66      private MessageListener _listener;
67      private boolean _seeOwn=false;
68  
69      /* ------------------------------------------------------------ */
70      /**
71       * Instantiate the service. Typically the derived constructor will call @
72       * #subscribe(String, String)} to map subscriptions to methods.
73       *
74       * @param bayeux
75       *            The bayeux instance.
76       * @param name
77       *            The name of the service (used as client ID prefix).
78       */
79      public BayeuxService(Bayeux bayeux, String name)
80      {
81          this(bayeux,name,0,false);
82      }
83  
84      /* ------------------------------------------------------------ */
85      /**
86       * Instantiate the service. Typically the derived constructor will call @
87       * #subscribe(String, String)} to map subscriptions to methods.
88       *
89       * @param bayeux
90       *            The bayeux instance.
91       * @param name
92       *            The name of the service (used as client ID prefix).
93       * @param maxThreads
94       *            The size of a ThreadPool to create to handle messages.
95       */
96      public BayeuxService(Bayeux bayeux, String name, int maxThreads)
97      {
98          this(bayeux,name,maxThreads,false);
99      }
100 
101     /* ------------------------------------------------------------ */
102     /**
103      * Instantiate the service. Typically the derived constructor will call @
104      * #subscribe(String, String)} to map subscriptions to methods.
105      *
106      * @param bayeux
107      *            The bayeux instance.
108      * @param name
109      *            The name of the service (used as client ID prefix).
110      * @param maxThreads
111      *            The size of a ThreadPool to create to handle messages.
112      * @param synchronous
113      *            True if message delivery will be synchronized on the client.
114      */
115     public BayeuxService(Bayeux bayeux, String name, int maxThreads, boolean synchronous)
116     {
117         if (maxThreads > 0)
118             setThreadPool(new QueuedThreadPool(maxThreads));
119         _name=name;
120         _bayeux=bayeux;
121         _client=_bayeux.newClient(name);
122         _listener=(synchronous)?new SyncListen():new AsyncListen();
123         _client.addListener(_listener);
124 
125     }
126 
127     /* ------------------------------------------------------------ */
128     public Bayeux getBayeux()
129     {
130         return _bayeux;
131     }
132 
133     /* ------------------------------------------------------------ */
134     public Client getClient()
135     {
136         return _client;
137     }
138 
139     /* ------------------------------------------------------------ */
140     public ThreadPool getThreadPool()
141     {
142         return _threadPool;
143     }
144 
145     /* ------------------------------------------------------------ */
146     /**
147      * Set the threadpool. If the {@link ThreadPool} is a {@link LifeCycle},
148      * then it is started by this method.
149      * 
150      * @param pool
151      */
152     public void setThreadPool(ThreadPool pool)
153     {
154         try
155         {
156             if (pool instanceof LifeCycle)
157                 if (!((LifeCycle)pool).isStarted())
158                     ((LifeCycle)pool).start();
159         }
160         catch(Exception e)
161         {
162             throw new IllegalStateException(e);
163         }
164         _threadPool=pool;
165     }
166 
167     /* ------------------------------------------------------------ */
168     public boolean isSeeOwnPublishes()
169     {
170         return _seeOwn;
171     }
172 
173     /* ------------------------------------------------------------ */
174     public void setSeeOwnPublishes(boolean own)
175     {
176         _seeOwn=own;
177     }
178 
179     /* ------------------------------------------------------------ */
180     /**
181      * Subscribe to a channel. Subscribe to channel and map a method to handle
182      * received messages. The method must have a unique name and one of the
183      * following signatures:
184      * <ul>
185      * <li><code>myMethod(Client fromClient,Object data)</code></li>
186      * <li><code>myMethod(Client fromClient,Object data,String id)</code></li>
187      * <li>
188      * <code>myMethod(Client fromClient,String channel,Object data,String id)</code>
189      * </li>
190      * </li>
191      *
192      * The data parameter can be typed if the type of the data object published
193      * by the client is known (typically Map<String,Object>). If the type of the
194      * data parameter is {@link Message} then the message object itself is
195      * passed rather than just the data.
196      * <p>
197      * Typically a service will subscribe to a channel in the "/service/**"
198      * space which is not a broadcast channel. Messages published to these
199      * channels are only delivered to server side clients like this service.
200      *
201      * <p>
202      * Any object returned by a mapped subscription method is delivered to the
203      * calling client and not broadcast. If the method returns void or null,
204      * then no response is sent. A mapped subscription method may also call
205      * {@link #send(Client, String, Object, String)} to deliver a response
206      * message(s) to different clients and/or channels. It may also publish
207      * methods via the normal {@link Bayeux} API.
208      * <p>
209      *
210      *
211      * @param channelId
212      *            The channel to subscribe to
213      * @param methodName
214      *            The name of the method on this object to call when messages
215      *            are recieved.
216      */
217     protected void subscribe(String channelId, String methodName)
218     {
219         Method method=null;
220 
221         Class<?> c=this.getClass();
222         while(c != null && c != Object.class)
223         {
224             Method[] methods=c.getDeclaredMethods();
225             for (int i=methods.length; i-- > 0;)
226             {
227                 if (methodName.equals(methods[i].getName()))
228                 {
229                     if (method != null)
230                         throw new IllegalArgumentException("Multiple methods called '" + methodName + "'");
231                     method=methods[i];
232                 }
233             }
234             c=c.getSuperclass();
235         }
236 
237         if (method == null)
238             throw new NoSuchMethodError(methodName);
239         int params=method.getParameterTypes().length;
240         if (params < 2 || params > 4)
241             throw new IllegalArgumentException("Method '" + methodName + "' does not have 2or3 parameters");
242         if (!Client.class.isAssignableFrom(method.getParameterTypes()[0]))
243             throw new IllegalArgumentException("Method '" + methodName + "' does not have Client as first parameter");
244 
245         Channel channel=_bayeux.getChannel(channelId,true);
246 
247         if (((ChannelImpl)channel).getChannelId().isWild())
248         {
249             final Method m=method;
250             Client wild_client=_bayeux.newClient(_name + "-wild");
251             wild_client.addListener(_listener instanceof MessageListener.Asynchronous?new AsyncWildListen(wild_client,m):new SyncWildListen(wild_client,m));
252             channel.subscribe(wild_client);
253         }
254         else
255         {
256             _methods.put(channelId,method);
257             channel.subscribe(_client);
258         }
259     }
260 
261     /* ------------------------------------------------------------ */
262     /**
263      * Send data to a individual client. The data passed is sent to the client
264      * as the "data" member of a message with the given channel and id. The
265      * message is not published on the channel and is thus not broadcast to all
266      * channel subscribers. However to the target client, the message appears as
267      * if it was broadcast.
268      * <p>
269      * Typcially this method is only required if a service method sends
270      * response(s) to channels other than the subscribed channel. If the
271      * response is to be sent to the subscribed channel, then the data can
272      * simply be returned from the subscription method.
273      *
274      * @param toClient
275      *            The target client
276      * @param onChannel
277      *            The channel the message is for
278      * @param data
279      *            The data of the message
280      * @param id
281      *            The id of the message (or null for a random id).
282      */
283     protected void send(Client toClient, String onChannel, Object data, String id)
284     {
285         toClient.deliver(getClient(),onChannel,data,id);
286     }
287 
288     /* ------------------------------------------------------------ */
289     /**
290      * Handle Exception. This method is called when a mapped subscription method
291      * throws and exception while handling a message.
292      *
293      * @param fromClient The remote client
294      * @param toClient The client associated with this BayeuxService
295      * @param msg The message
296      * @param th The exception thrown by the mapped subscription method
297      */
298     protected void exception(Client fromClient, Client toClient, Map<String,Object> msg, Throwable th)
299     {
300         System.err.println(msg);
301         th.printStackTrace();
302     }
303 
304     /* ------------------------------------------------------------ */
305     private void invoke(final Method method, final Client fromClient, final Client toClient, final Message msg)
306     {
307         if (_threadPool == null)
308         {
309             doInvoke(method,fromClient,toClient,msg);
310         }
311         else
312         {
313             ((MessageImpl)msg).incRef();
314             _threadPool.dispatch(new Runnable()
315             {
316                 public void run()
317                 {
318                     asyncDoInvoke(method, fromClient, toClient, msg);
319                 }
320             });
321         }
322     }
323 
324     protected void asyncDoInvoke(Method method, Client fromClient, Client toClient, Message msg)
325     {
326         try
327         {
328             doInvoke(method,fromClient,toClient,msg);
329         }
330         finally
331         {
332             ((MessageImpl)msg).decRef();
333         }
334     }
335 
336     private void doInvoke(Method method, Client fromClient, Client toClient, Message msg)
337     {
338         String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
339         Object data=msg.get(Bayeux.DATA_FIELD);
340         String id=msg.getId();
341 
342         if (method != null)
343         {
344             try
345             {
346                 Class<?>[] args=method.getParameterTypes();
347                 Object arg;
348                 if (args.length == 4)
349                     arg=Message.class.isAssignableFrom(args[2])?msg:data;
350                 else
351                     arg=Message.class.isAssignableFrom(args[1])?msg:data;
352 
353                 Object reply=null;
354                 switch(method.getParameterTypes().length)
355                 {
356                     case 2:
357                         reply=method.invoke(this,fromClient,arg);
358                         break;
359                     case 3:
360                         reply=method.invoke(this,fromClient,arg,id);
361                         break;
362                     case 4:
363                         reply=method.invoke(this,fromClient,channel,arg,id);
364                         break;
365                 }
366 
367                 if (reply != null)
368                     send(fromClient,channel,reply,id);
369             }
370             catch(Exception e)
371             {
372                 Log.debug("method",method);
373                 exception(fromClient,toClient,msg,e);
374             }
375             catch(Error e)
376             {
377                 Log.debug("method",method);
378                 exception(fromClient,toClient,msg,e);
379             }
380         }
381     }
382 
383     /* ------------------------------------------------------------ */
384     /* ------------------------------------------------------------ */
385     private class AsyncListen implements MessageListener, MessageListener.Asynchronous
386     {
387         public void deliver(Client fromClient, Client toClient, Message msg)
388         {
389             if (!_seeOwn && fromClient == getClient())
390                 return;
391             String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
392             Method method=_methods.get(channel);
393             invoke(method,fromClient,toClient,msg);
394         }
395     }
396 
397     /* ------------------------------------------------------------ */
398     /* ------------------------------------------------------------ */
399     private class SyncListen implements MessageListener, MessageListener.Synchronous
400     {
401         public void deliver(Client fromClient, Client toClient, Message msg)
402         {
403             if (!_seeOwn && fromClient == getClient())
404                 return;
405             String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
406             Method method=_methods.get(channel);
407             invoke(method,fromClient,toClient,msg);
408         }
409     }
410 
411     /* ------------------------------------------------------------ */
412     /* ------------------------------------------------------------ */
413     private class SyncWildListen implements MessageListener, MessageListener.Synchronous
414     {
415         Client _client;
416         Method _method;
417 
418         public SyncWildListen(Client client, Method method)
419         {
420             _client=client;
421             _method=method;
422         }
423 
424         public void deliver(Client fromClient, Client toClient, Message msg)
425         {
426             if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
427                 return;
428             invoke(_method,fromClient,toClient,msg);
429         }
430     }
431 
432     /* ------------------------------------------------------------ */
433     /* ------------------------------------------------------------ */
434     private class AsyncWildListen implements MessageListener, MessageListener.Asynchronous
435     {
436         Client _client;
437         Method _method;
438 
439         public AsyncWildListen(Client client, Method method)
440         {
441             _client=client;
442             _method=method;
443         }
444 
445         public void deliver(Client fromClient, Client toClient, Message msg)
446         {
447             if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
448                 return;
449             invoke(_method,fromClient,toClient,msg);
450         }
451     }
452 }