1 package org.cometd.oort;
2
3 import java.security.SecureRandom;
4 import java.util.ArrayList;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Random;
10 import java.util.Set;
11 import java.util.Timer;
12
13 import org.cometd.Bayeux;
14 import org.cometd.Client;
15 import org.cometd.Extension;
16 import org.cometd.Message;
17 import org.cometd.MessageListener;
18 import org.cometd.RemoveListener;
19 import org.mortbay.component.AbstractLifeCycle;
20 import org.mortbay.jetty.client.HttpClient;
21 import org.mortbay.log.Log;
22 import org.mortbay.util.ajax.JSON;
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 public class Oort extends AbstractLifeCycle
39 {
40 public final static String OORT_URL = "oort.url";
41 public final static String OORT_CLOUD = "oort.cloud";
42 public final static String OORT_CHANNELS = "oort.channels";
43 public final static String OORT_ATTRIBUTE = "org.cometd.oort.Oort";
44
45 protected String _url;
46 protected String _secret;
47 protected Bayeux _bayeux;
48 protected HttpClient _httpClient=new HttpClient();
49 protected Timer _timer=new Timer();
50 protected Random _random=new SecureRandom();
51 protected Client _oortClient;
52 protected List<MessageListener> _oortMessageListeners = new ArrayList<MessageListener>();
53
54 protected Map<String,OortComet> _knownCommets = new HashMap<String,OortComet>();
55 protected Set<String> _channels = new HashSet<String>();
56
57
58 Oort(String id,Bayeux bayeux)
59 {
60 _url=id;
61 _bayeux=bayeux;
62 _secret=Long.toHexString(_random.nextLong());
63
64 _oortClient=_bayeux.newClient("oort");
65 _oortClient.addListener(new RootOortClientListener());
66 _bayeux.getChannel("/oort/cloud",true).subscribe(_oortClient);
67 bayeux.addExtension(new OortExtension());
68 }
69
70
71 public Bayeux getBayeux()
72 {
73 return _bayeux;
74 }
75
76
77
78
79
80 public String getURL()
81 {
82 return _url;
83 }
84
85
86 public String getSecret()
87 {
88 return _secret;
89 }
90
91
92 protected void doStart() throws Exception
93 {
94 super.doStart();
95 _httpClient.start();
96 }
97
98
99
100
101
102
103
104
105
106
107
108 public OortComet observeComet(String cometUrl)
109 {
110 synchronized (this)
111 {
112 if (_url.equals(cometUrl))
113 return null;
114 OortComet comet = _knownCommets.get(cometUrl);
115 if (comet==null)
116 {
117 try
118 {
119 comet = new OortComet(this,cometUrl);
120 _knownCommets.put(cometUrl,comet);
121 comet.start();
122 }
123 catch(Exception e)
124 {
125 throw new IllegalStateException(e);
126 }
127 }
128 return comet;
129 }
130 }
131
132
133
134
135
136
137
138
139
140
141
142 void observedComets(Set<String> comets)
143 {
144 synchronized (this)
145 {
146 Set<String> known=getKnownComets();
147 for (String comet : comets)
148 if (!_url.equals(comet))
149 observeComet(comet);
150 known=getKnownComets();
151
152 if (!comets.containsAll(known))
153 _bayeux.getChannel("/oort/cloud",true).publish(_oortClient,known,null);
154 }
155 }
156
157
158
159
160
161 public Set<String> getKnownComets()
162 {
163 synchronized (this)
164 {
165 Set<String> comets = new HashSet<String>(_knownCommets.keySet());
166 comets.add(_url);
167 return comets;
168 }
169 }
170
171
172
173
174
175
176
177
178
179
180
181 public void observeChannel(String channelId)
182 {
183 synchronized (this)
184 {
185 if (!_channels.contains(channelId))
186 {
187 _channels.add(channelId);
188 for (OortComet comet : _knownCommets.values())
189 if (comet.isHandshook())
190 comet.subscribe(channelId);
191 }
192 }
193 }
194
195
196
197
198
199
200
201 public void addOortMessageListener(MessageListener listener)
202 {
203 synchronized (this)
204 {
205 _oortMessageListeners.add(listener);
206 }
207 }
208
209
210
211
212
213
214
215 public boolean removeOortClientListener(MessageListener listener)
216 {
217 synchronized (this)
218 {
219 return _oortMessageListeners.remove(listener);
220 }
221 }
222
223
224 public boolean isOort(Client client)
225 {
226 return client==_oortClient;
227 }
228
229
230 public String toString()
231 {
232 return _url;
233 }
234
235
236
237
238
239
240
241
242
243
244 protected void oortHandshook(String oortUrl,String oortSecret,String clientId)
245 {
246 Log.info(this+": "+clientId+" is oort "+oortUrl);
247 if (!_knownCommets.containsKey(oortUrl))
248 observeComet(oortUrl);
249
250 Client client = _bayeux.getClient(clientId);
251
252 client.addExtension(new RemoteOortClientExtension());
253 }
254
255
256
257
258
259
260
261
262 protected class OortExtension implements Extension
263 {
264 public Message rcv(Client from, Message message)
265 {
266 return message;
267 }
268
269 public Message rcvMeta(Client from, Message message)
270 {
271 return message;
272 }
273
274 public Message send(Client from, Message message)
275 {
276 return message;
277 }
278
279 public Message sendMeta(Client from, Message message)
280 {
281 if (message.getChannel().equals(Bayeux.META_HANDSHAKE) && Boolean.TRUE.equals(message.get(Bayeux.SUCCESSFUL_FIELD)))
282 {
283 Message rcv = message.getAssociated();
284 System.err.println(_url+" --> "+rcv);
285
286 Map<String,Object> rcvExt = (Map<String,Object>)rcv.get("ext");
287 if (rcvExt!=null)
288 {
289 Map<String,Object> oort = (Map<String,Object>)rcvExt.get("oort");
290 if (oort!=null)
291 {
292 String cometUrl = (String)oort.get("comet");
293 String oortUrl = (String)oort.get("oort");
294
295 if (getURL().equals(cometUrl))
296 {
297 String oortSecret = (String)oort.get("oortSecret");
298
299 oortHandshook(oortUrl,oortSecret,message.getClientId());
300
301 Object ext=message.get("ext");
302
303 Map<String,Object> sndExt = (Map<String,Object>)((ext instanceof JSON.Literal)?JSON.parse(ext.toString()):ext);
304 if (sndExt==null)
305 sndExt = new HashMap<String,Object>();
306 oort.put("cometSecret",getSecret());
307 sndExt.put("oort",oort);
308 message.put("ext",sndExt);
309 }
310 }
311 }
312 System.err.println(_url+" <-- "+message);
313 }
314 return message;
315 }
316 }
317
318
319
320
321
322
323
324 protected class RemoteOortClientExtension implements Extension
325 {
326 public boolean queueMaxed(Client from, Client client, Message message)
327 {
328
329 boolean send = from!=_oortClient || message.getChannel().startsWith("/oort/");
330 return send;
331 }
332
333 public Message rcv(Client from, Message message)
334 {
335 return message;
336 }
337
338 public Message rcvMeta(Client from, Message message)
339 {
340 return message;
341 }
342
343 public Message send(Client from, Message message)
344 {
345
346 boolean send = !isOort(from) || message.getChannel().startsWith("/oort/");
347 return send?message:null;
348 }
349
350 public Message sendMeta(Client from, Message message)
351 {
352 return message;
353 }
354 }
355
356
357
358
359
360
361 protected class RootOortClientListener implements RemoveListener, MessageListener
362 {
363 public void removed(String clientId, boolean timeout)
364 {
365
366 }
367
368 public void deliver(Client fromClient, Client toClient, Message msg)
369 {
370 String channelId = msg.getChannel();
371 if (msg.getData()!=null)
372 {
373 if (channelId.equals("/oort/cloud") && msg.getData() instanceof Object[])
374 {
375 Object[] data = (Object[])msg.getData();
376 Set<String> comets = new HashSet<String>();
377 for (Object o:data)
378 comets.add(o.toString());
379 observedComets(comets);
380 }
381 }
382 }
383
384 }
385 }