1 package org.cometd.oort;
2
3 import java.util.Collection;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.Set;
7 import java.util.concurrent.ConcurrentHashMap;
8 import java.util.concurrent.ConcurrentMap;
9
10 import org.cometd.Channel;
11 import org.cometd.Client;
12 import org.cometd.Message;
13 import org.cometd.MessageListener;
14 import org.mortbay.component.AbstractLifeCycle;
15 import org.mortbay.util.LazyList;
16 import org.mortbay.util.MultiMap;
17 import org.mortbay.util.ajax.JSON;
18 import org.mortbay.util.ajax.JSON.Output;
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 public class Seti extends AbstractLifeCycle
43 {
44 public final static String SETI_ATTRIBUTE="org.cometd.oort.Seti";
45 public final static String SETI_SHARD="seti.shard";
46
47 final String _setiId;
48 final String _setiChannelId;
49 final String _shardId;
50 final Oort _oort;
51 final Client _client;
52 final ShardLocation _allShardLocation;
53 final Channel _setiIdChannel;
54 final Channel _setiAllChannel;
55 final Channel _setiShardChannel;
56
57 final ConcurrentMap<String, Location> _uid2Location = new ConcurrentHashMap<String, Location>();
58
59
60 public Seti(Oort oort, String shardId)
61 {
62 _oort=oort;
63 _client = _oort.getBayeux().newClient("seti");
64 _setiId=_oort.getURL().replace("://","_").replace("/","_").replace(":","_");
65 _shardId=shardId;
66
67 _setiChannelId="/seti/"+_setiId;
68 _setiIdChannel=_oort.getBayeux().getChannel(_setiChannelId,true);
69 _setiIdChannel.setPersistent(true);
70 _oort.observeChannel(_setiIdChannel.getId());
71 _setiIdChannel.subscribe(_client);
72
73 _setiAllChannel=_oort.getBayeux().getChannel("/seti/ALL",true);
74 _setiAllChannel.setPersistent(true);
75 _oort.observeChannel(_setiAllChannel.getId());
76 _setiAllChannel.subscribe(_client);
77
78 _setiShardChannel=_oort.getBayeux().getChannel("/seti/"+shardId,true);
79 _setiShardChannel.setPersistent(true);
80 _oort.observeChannel(_setiShardChannel.getId());
81 _setiShardChannel.subscribe(_client);
82
83 _allShardLocation = new ShardLocation(_setiAllChannel);
84
85 }
86
87
88 protected void doStart()
89 throws Exception
90 {
91 super.doStart();
92 _client.addListener(new MessageListener()
93 {
94 public void deliver(Client from, Client to, Message msg)
95 {
96 receive(from,to,msg);
97 }
98 });
99 }
100
101
102 protected void doStop()
103 throws Exception
104 {
105 _client.disconnect();
106 }
107
108
109 public void associate(final String userId,final Client client)
110 {
111 _uid2Location.put(userId,new LocalLocation(client));
112 userId2Shard(userId).associate(userId);
113 }
114
115
116 public void disassociate(final String userId)
117 {
118 _uid2Location.remove(userId);
119 userId2Shard(userId).disassociate(userId);
120 }
121
122
123 public void sendMessage(final String toUser,final String toChannel,final Object message)
124 {
125 Location location = _uid2Location.get(toUser);
126 if (location==null)
127 location = userId2Shard(toUser);
128
129 location.sendMessage(toUser,toChannel,message);
130 }
131
132
133 public void sendMessage(final Collection<String> toUsers,final String toChannel,final Object message)
134 {
135
136 MultiMap shard2users = new MultiMap();
137 for (String userId:toUsers)
138 {
139 ShardLocation shard = userId2Shard(userId);
140 shard2users.add(shard,userId);
141 }
142
143
144 for (Map.Entry<ShardLocation,Object> entry : (Set<Map.Entry<ShardLocation,Object>>)shard2users.entrySet())
145 {
146
147
148
149
150
151
152
153 ShardLocation shard = entry.getKey();
154 Object lazyUsers = entry.getValue();
155
156 if (LazyList.size(lazyUsers)==1)
157 shard.sendMessage((String)lazyUsers,toChannel,message);
158 else
159 shard.sendMessage((List<String>)lazyUsers,toChannel,message);
160 }
161 }
162
163
164 protected ShardLocation userId2Shard(final String userId)
165 {
166 return _allShardLocation;
167 }
168
169
170 protected void receive(final Client from, final Client to, final Message msg)
171 {
172
173
174 if (!(msg.getData() instanceof Map))
175 return;
176
177
178 Map<String,Object> data = (Map<String,Object>)msg.getData();
179 final String toUid=(String)data.get("to");
180 final String fromUid=(String)data.get("from");
181 final Object message = data.get("message");
182 final String on = (String)data.get("on");
183
184
185 if (fromUid!=null)
186 {
187 if (on!=null)
188 {
189
190 _uid2Location.put(fromUid,new SetiLocation("/seti/"+on));
191 }
192 else
193 {
194 final String off = (String)data.get("off");
195 if (off!=null)
196 {
197
198 _uid2Location.remove(fromUid,new SetiLocation("/seti/"+off));
199 }
200 }
201 }
202
203
204 if (message!=null && toUid!=null)
205 {
206 final String toChannel=(String)data.get("channel");
207 Location location=_uid2Location.get(toUid);
208
209 if (location==null && _setiChannelId.equals(msg.getChannel()))
210
211 location =userId2Shard(toUid);
212
213 if (location!=null)
214 location.receive(toUid,toChannel,message);
215 }
216
217 }
218
219
220
221
222 private interface Location
223 {
224 public void sendMessage(String toUser,String toChannel,Object message);
225 public void receive(String toUser,String toChannel,Object message);
226 }
227
228
229
230
231 class LocalLocation implements Location
232 {
233 Client _client;
234
235 LocalLocation(Client client)
236 {
237 _client=client;
238 }
239
240 public void sendMessage(String toUser, String toChannel, Object message)
241 {
242 _client.deliver(Seti.this._client,toChannel,message,null);
243 }
244
245 public void receive(String toUser, String toChannel, Object message)
246 {
247 _client.deliver(Seti.this._client,toChannel,message,null);
248 }
249 }
250
251
252
253 class SetiLocation implements Location
254 {
255 Channel _channel;
256
257 SetiLocation(String channelId)
258 {
259 _channel=_oort._bayeux.getChannel(channelId,true);
260 }
261
262 SetiLocation(Channel channel)
263 {
264 _channel=channel;
265 }
266
267 public void sendMessage(String toUser, String toChannel, Object message)
268 {
269 _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
270 }
271
272 public void receive(String toUser, String toChannel, Object message)
273 {
274
275 }
276
277 public boolean equals(Object o)
278 {
279 return o instanceof SetiLocation &&
280 ((SetiLocation)o)._channel.equals(_channel);
281 }
282
283 public int hashCode()
284 {
285 return _channel.hashCode();
286 }
287 }
288
289
290
291 class ShardLocation implements Location
292 {
293 Channel _channel;
294
295 ShardLocation(String shardId)
296 {
297 _channel=_oort._bayeux.getChannel("/seti/"+shardId,true);
298
299 }
300
301 ShardLocation(Channel channel)
302 {
303 _channel=channel;
304 }
305
306 public void sendMessage(final Collection<String> toUsers, final String toChannel, final Object message)
307 {
308 _channel.publish(Seti.this._client,new SetiMessage(toUsers,toChannel,message),null);
309 }
310
311 public void sendMessage(String toUser, String toChannel, Object message)
312 {
313 _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
314 }
315
316 public void receive(String toUser, String toChannel, Object message)
317 {
318
319 }
320
321 public void associate(final String user)
322 {
323 _channel.publish(Seti.this._client,new SetiPresence(user,true),null);
324 }
325
326 public void disassociate(final String user)
327 {
328 _channel.publish(Seti.this._client,new SetiPresence(user,false),null);
329 }
330 }
331
332
333
334 class SetiMessage implements JSON.Convertible
335 {
336 String _toUser;
337 Collection<String> _toUsers;
338 String _toChannel;
339 Object _message;
340
341 SetiMessage(String toUser,String toChannel, Object message)
342 {
343 _toUser=toUser;
344 _toChannel=toChannel;
345 _message=message;
346 }
347
348 SetiMessage(Collection<String> toUsers,String toChannel, Object message)
349 {
350 _toUsers=toUsers;
351 _toChannel=toChannel;
352 _message=message;
353 }
354
355 public void fromJSON(Map object)
356 {
357 throw new UnsupportedOperationException();
358 }
359
360 public void toJSON(Output out)
361 {
362 if (_toUser!=null)
363 out.add("to",_toUser);
364 else if (_toUsers!=null)
365 out.add("to",_toUsers);
366 out.add("channel",_toChannel);
367 out.add("from",_setiId);
368 out.add("message",_message);
369 }
370 }
371
372
373
374 class SetiPresence implements JSON.Convertible
375 {
376 String _user;
377 boolean _on;
378
379 SetiPresence(String user,boolean on)
380 {
381 _user=user;
382 _on=on;
383 }
384
385 public void fromJSON(Map object)
386 {
387 throw new UnsupportedOperationException();
388 }
389
390 public void toJSON(Output out)
391 {
392 out.add("from",_user);
393 out.add(_on?"on":"off",_setiId);
394 }
395 }
396
397 }