1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.cometd;
15
16 import java.lang.reflect.Method;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentHashMap;
19
20 import org.cometd.Bayeux;
21 import org.cometd.Channel;
22 import org.cometd.Client;
23 import org.cometd.Listener;
24 import org.cometd.Message;
25 import org.cometd.MessageListener;
26 import org.mortbay.component.LifeCycle;
27 import org.mortbay.thread.QueuedThreadPool;
28 import org.mortbay.thread.ThreadPool;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public abstract class BayeuxService
54 {
55 private String _name;
56 private Bayeux _bayeux;
57 private Client _client;
58 private Map<String,Method> _methods = new ConcurrentHashMap<String,Method>();
59 private ThreadPool _threadPool;
60 private MessageListener _listener;
61 private boolean _seeOwn=false;
62
63
64
65
66
67
68
69
70 public BayeuxService(Bayeux bayeux,String name)
71 {
72 this(bayeux,name,0,false);
73 }
74
75
76
77
78
79
80
81
82
83 public BayeuxService(Bayeux bayeux,String name, int maxThreads)
84 {
85 this(bayeux,name,maxThreads,false);
86 }
87
88
89
90
91
92
93
94
95
96
97 public BayeuxService(Bayeux bayeux,String name, int maxThreads, boolean synchronous)
98 {
99 if (maxThreads>0)
100 setThreadPool(new QueuedThreadPool(maxThreads));
101 _name=name;
102 _bayeux=bayeux;
103 _client=_bayeux.newClient(name);
104 _listener=(synchronous)?new SyncListen():new AsyncListen();
105 _client.addListener(_listener);
106
107 }
108
109
110 public Bayeux getBayeux()
111 {
112 return _bayeux;
113 }
114
115
116 public Client getClient()
117 {
118 return _client;
119 }
120
121
122 public ThreadPool getThreadPool()
123 {
124 return _threadPool;
125 }
126
127
128
129
130
131
132
133
134 public void setThreadPool(ThreadPool pool)
135 {
136 try
137 {
138 if (pool instanceof LifeCycle)
139 if (!((LifeCycle)pool).isStarted())
140 ((LifeCycle)pool).start();
141 }
142 catch(Exception e)
143 {
144 throw new IllegalStateException(e);
145 }
146 _threadPool = pool;
147 }
148
149
150 public boolean isSeeOwnPublishes()
151 {
152 return _seeOwn;
153 }
154
155
156 public void setSeeOwnPublishes(boolean own)
157 {
158 _seeOwn = own;
159 }
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 protected void subscribe(String channelId,String methodName)
191 {
192 Method method=null;
193
194 Class<?> c=this.getClass();
195 while (c!=null && c!=Object.class)
196 {
197 Method[] methods = c.getDeclaredMethods();
198 for (int i=methods.length;i-->0;)
199 {
200 if (methodName.equals(methods[i].getName()))
201 {
202 if (method!=null)
203 throw new IllegalArgumentException("Multiple methods called '"+methodName+"'");
204 method=methods[i];
205 }
206 }
207 c=c.getSuperclass();
208 }
209
210 if (method==null)
211 throw new NoSuchMethodError(methodName);
212 int params=method.getParameterTypes().length;
213 if (params<2 || params>4)
214 throw new IllegalArgumentException("Method '"+methodName+"' does not have 2or3 parameters");
215 if (!Client.class.isAssignableFrom(method.getParameterTypes()[0]))
216 throw new IllegalArgumentException("Method '"+methodName+"' does not have Client as first parameter");
217
218 Channel channel=_bayeux.getChannel(channelId,true);
219
220 if (((ChannelImpl)channel).getChannelId().isWild())
221 {
222 final Method m=method;
223 Client wild_client=_bayeux.newClient(_name+"-wild");
224 wild_client.addListener(_listener instanceof MessageListener.Asynchronous?new AsyncWildListen(wild_client,m):new SyncWildListen(wild_client,m));
225 channel.subscribe(wild_client);
226 }
227 else
228 {
229 _methods.put(channelId,method);
230 channel.subscribe(_client);
231 }
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 protected void send(Client toClient, String onChannel, Object data, String id)
251 {
252 toClient.deliver(getClient(),onChannel,data,id);
253 }
254
255
256
257
258
259
260
261
262
263
264 protected void exception(Client fromClient, Client toClient, Map<String, Object> msg,Throwable th)
265 {
266 System.err.println(msg);
267 th.printStackTrace();
268 }
269
270
271
272 private void invoke(final Method method,final Client fromClient, final Client toClient, final Message msg)
273 {
274 if (_threadPool==null)
275 doInvoke(method,fromClient,toClient,msg);
276 else
277 {
278 _threadPool.dispatch(new Runnable()
279 {
280 public void run()
281 {
282 try
283 {
284 ((MessageImpl)msg).incRef();
285 doInvoke(method,fromClient,toClient,msg);
286 }
287 finally
288 {
289 ((MessageImpl)msg).decRef();
290 }
291 }
292 });
293 }
294 }
295
296
297 private void doInvoke(Method method,Client fromClient, Client toClient, Message msg)
298 {
299 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
300 Object data=msg.get(Bayeux.DATA_FIELD);
301 String id=msg.getId();
302
303 if (method!=null)
304 {
305 try
306 {
307 Class<?>[] args = method.getParameterTypes();
308 Object arg=Message.class.isAssignableFrom(args[1])?msg:data;
309
310 Object reply=null;
311 switch(method.getParameterTypes().length)
312 {
313 case 2:
314 reply=method.invoke(this,fromClient,arg);
315 break;
316 case 3:
317 reply=method.invoke(this,fromClient,arg,id);
318 break;
319 case 4:
320 reply=method.invoke(this,fromClient,channel,arg,id);
321 break;
322 }
323
324 if (reply!=null)
325 send(fromClient,channel,reply,id);
326 }
327 catch (Exception e)
328 {
329 System.err.println(method);
330 exception(fromClient,toClient,msg,e);
331 }
332 catch (Error e)
333 {
334 System.err.println(method);
335 exception(fromClient,toClient,msg,e);
336 }
337 }
338 }
339
340
341
342 private class AsyncListen implements MessageListener, MessageListener.Asynchronous
343 {
344 public void deliver(Client fromClient, Client toClient, Message msg)
345 {
346 if (!_seeOwn && fromClient==getClient())
347 return;
348 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
349 Method method=_methods.get(channel);
350 invoke(method,fromClient,toClient,msg);
351 }
352 }
353
354
355
356 private class SyncListen implements MessageListener, MessageListener.Synchronous
357 {
358 public void deliver(Client fromClient, Client toClient, Message msg)
359 {
360 if (!_seeOwn && fromClient==getClient())
361 return;
362 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
363 Method method=_methods.get(channel);
364 invoke(method,fromClient,toClient,msg);
365 }
366 }
367
368
369
370
371 private class SyncWildListen implements MessageListener, MessageListener.Synchronous
372 {
373 Client _client;
374 Method _method;
375
376 public SyncWildListen(Client client,Method method)
377 {
378 _client=client;
379 _method=method;
380 }
381 public void deliver(Client fromClient, Client toClient, Message msg)
382 {
383 if (!_seeOwn && (fromClient==_client || fromClient==getClient()))
384 return;
385 invoke(_method,fromClient,toClient,msg);
386 }
387 };
388
389
390
391
392 private class AsyncWildListen implements MessageListener, MessageListener.Asynchronous
393 {
394 Client _client;
395 Method _method;
396 public AsyncWildListen(Client client,Method method)
397 {
398 _client=client;
399 _method=method;
400 }
401 public void deliver(Client fromClient, Client toClient, Message msg)
402 {
403 if (!_seeOwn && (fromClient==_client || fromClient==getClient()))
404 return;
405 invoke(_method,fromClient,toClient,msg);
406 }
407 };
408
409 }
410