1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.cometd;
15
16 import java.lang.reflect.Method;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentHashMap;
19
20 import org.cometd.Bayeux;
21 import org.cometd.Channel;
22 import org.cometd.Client;
23 import org.cometd.Listener;
24 import org.cometd.Message;
25 import org.cometd.MessageListener;
26 import org.mortbay.component.LifeCycle;
27 import org.mortbay.log.Log;
28 import org.mortbay.thread.QueuedThreadPool;
29 import org.mortbay.thread.ThreadPool;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public abstract class BayeuxService
60 {
61 private String _name;
62 private Bayeux _bayeux;
63 private Client _client;
64 private Map<String,Method> _methods=new ConcurrentHashMap<String,Method>();
65 private ThreadPool _threadPool;
66 private MessageListener _listener;
67 private boolean _seeOwn=false;
68
69
70
71
72
73
74
75
76
77
78
79 public BayeuxService(Bayeux bayeux, String name)
80 {
81 this(bayeux,name,0,false);
82 }
83
84
85
86
87
88
89
90
91
92
93
94
95
96 public BayeuxService(Bayeux bayeux, String name, int maxThreads)
97 {
98 this(bayeux,name,maxThreads,false);
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public BayeuxService(Bayeux bayeux, String name, int maxThreads, boolean synchronous)
116 {
117 if (maxThreads > 0)
118 setThreadPool(new QueuedThreadPool(maxThreads));
119 _name=name;
120 _bayeux=bayeux;
121 _client=_bayeux.newClient(name);
122 _listener=(synchronous)?new SyncListen():new AsyncListen();
123 _client.addListener(_listener);
124
125 }
126
127
128 public Bayeux getBayeux()
129 {
130 return _bayeux;
131 }
132
133
134 public Client getClient()
135 {
136 return _client;
137 }
138
139
140 public ThreadPool getThreadPool()
141 {
142 return _threadPool;
143 }
144
145
146
147
148
149
150
151
152 public void setThreadPool(ThreadPool pool)
153 {
154 try
155 {
156 if (pool instanceof LifeCycle)
157 if (!((LifeCycle)pool).isStarted())
158 ((LifeCycle)pool).start();
159 }
160 catch(Exception e)
161 {
162 throw new IllegalStateException(e);
163 }
164 _threadPool=pool;
165 }
166
167
168 public boolean isSeeOwnPublishes()
169 {
170 return _seeOwn;
171 }
172
173
174 public void setSeeOwnPublishes(boolean own)
175 {
176 _seeOwn=own;
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217 protected void subscribe(String channelId, String methodName)
218 {
219 Method method=null;
220
221 Class<?> c=this.getClass();
222 while(c != null && c != Object.class)
223 {
224 Method[] methods=c.getDeclaredMethods();
225 for (int i=methods.length; i-- > 0;)
226 {
227 if (methodName.equals(methods[i].getName()))
228 {
229 if (method != null)
230 throw new IllegalArgumentException("Multiple methods called '" + methodName + "'");
231 method=methods[i];
232 }
233 }
234 c=c.getSuperclass();
235 }
236
237 if (method == null)
238 throw new NoSuchMethodError(methodName);
239 int params=method.getParameterTypes().length;
240 if (params < 2 || params > 4)
241 throw new IllegalArgumentException("Method '" + methodName + "' does not have 2or3 parameters");
242 if (!Client.class.isAssignableFrom(method.getParameterTypes()[0]))
243 throw new IllegalArgumentException("Method '" + methodName + "' does not have Client as first parameter");
244
245 Channel channel=_bayeux.getChannel(channelId,true);
246
247 if (((ChannelImpl)channel).getChannelId().isWild())
248 {
249 final Method m=method;
250 Client wild_client=_bayeux.newClient(_name + "-wild");
251 wild_client.addListener(_listener instanceof MessageListener.Asynchronous?new AsyncWildListen(wild_client,m):new SyncWildListen(wild_client,m));
252 channel.subscribe(wild_client);
253 }
254 else
255 {
256 _methods.put(channelId,method);
257 channel.subscribe(_client);
258 }
259 }
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283 protected void send(Client toClient, String onChannel, Object data, String id)
284 {
285 toClient.deliver(getClient(),onChannel,data,id);
286 }
287
288
289
290
291
292
293
294
295
296
297
298 protected void exception(Client fromClient, Client toClient, Map<String,Object> msg, Throwable th)
299 {
300 System.err.println(msg);
301 th.printStackTrace();
302 }
303
304
305 private void invoke(final Method method, final Client fromClient, final Client toClient, final Message msg)
306 {
307 if (_threadPool == null)
308 {
309 doInvoke(method,fromClient,toClient,msg);
310 }
311 else
312 {
313 ((MessageImpl)msg).incRef();
314 _threadPool.dispatch(new Runnable()
315 {
316 public void run()
317 {
318 asyncDoInvoke(method, fromClient, toClient, msg);
319 }
320 });
321 }
322 }
323
324 protected void asyncDoInvoke(Method method, Client fromClient, Client toClient, Message msg)
325 {
326 try
327 {
328 doInvoke(method,fromClient,toClient,msg);
329 }
330 finally
331 {
332 ((MessageImpl)msg).decRef();
333 }
334 }
335
336 private void doInvoke(Method method, Client fromClient, Client toClient, Message msg)
337 {
338 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
339 Object data=msg.get(Bayeux.DATA_FIELD);
340 String id=msg.getId();
341
342 if (method != null)
343 {
344 try
345 {
346 Class<?>[] args=method.getParameterTypes();
347 Object arg;
348 if (args.length == 4)
349 arg=Message.class.isAssignableFrom(args[2])?msg:data;
350 else
351 arg=Message.class.isAssignableFrom(args[1])?msg:data;
352
353 Object reply=null;
354 switch(method.getParameterTypes().length)
355 {
356 case 2:
357 reply=method.invoke(this,fromClient,arg);
358 break;
359 case 3:
360 reply=method.invoke(this,fromClient,arg,id);
361 break;
362 case 4:
363 reply=method.invoke(this,fromClient,channel,arg,id);
364 break;
365 }
366
367 if (reply != null)
368 send(fromClient,channel,reply,id);
369 }
370 catch(Exception e)
371 {
372 Log.debug("method",method);
373 exception(fromClient,toClient,msg,e);
374 }
375 catch(Error e)
376 {
377 Log.debug("method",method);
378 exception(fromClient,toClient,msg,e);
379 }
380 }
381 }
382
383
384
385 private class AsyncListen implements MessageListener, MessageListener.Asynchronous
386 {
387 public void deliver(Client fromClient, Client toClient, Message msg)
388 {
389 if (!_seeOwn && fromClient == getClient())
390 return;
391 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
392 Method method=_methods.get(channel);
393 invoke(method,fromClient,toClient,msg);
394 }
395 }
396
397
398
399 private class SyncListen implements MessageListener, MessageListener.Synchronous
400 {
401 public void deliver(Client fromClient, Client toClient, Message msg)
402 {
403 if (!_seeOwn && fromClient == getClient())
404 return;
405 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
406 Method method=_methods.get(channel);
407 invoke(method,fromClient,toClient,msg);
408 }
409 }
410
411
412
413 private class SyncWildListen implements MessageListener, MessageListener.Synchronous
414 {
415 Client _client;
416 Method _method;
417
418 public SyncWildListen(Client client, Method method)
419 {
420 _client=client;
421 _method=method;
422 }
423
424 public void deliver(Client fromClient, Client toClient, Message msg)
425 {
426 if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
427 return;
428 invoke(_method,fromClient,toClient,msg);
429 }
430 }
431
432
433
434 private class AsyncWildListen implements MessageListener, MessageListener.Asynchronous
435 {
436 Client _client;
437 Method _method;
438
439 public AsyncWildListen(Client client, Method method)
440 {
441 _client=client;
442 _method=method;
443 }
444
445 public void deliver(Client fromClient, Client toClient, Message msg)
446 {
447 if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
448 return;
449 invoke(_method,fromClient,toClient,msg);
450 }
451 }
452 }