View Javadoc

1   package org.cometd.oort;
2   
3   import java.util.Collection;
4   import java.util.List;
5   import java.util.Map;
6   import java.util.Set;
7   import java.util.concurrent.ConcurrentHashMap;
8   import java.util.concurrent.ConcurrentMap;
9   
10  import org.cometd.Channel;
11  import org.cometd.Client;
12  import org.cometd.Message;
13  import org.cometd.MessageListener;
14  import org.mortbay.component.AbstractLifeCycle;
15  import org.mortbay.util.LazyList;
16  import org.mortbay.util.MultiMap;
17  import org.mortbay.util.ajax.JSON;
18  import org.mortbay.util.ajax.JSON.Output;
19  
20  /* ------------------------------------------------------------ */
21  /** The Search for Extra Terrestial Intelligence.
22   * 
23   * Well in this case, just the search for a user logged onto an
24   * Cometd node in an Oort cluster.
25   * <p>
26   * Seti allows an application to maintain a mapping from userId to 
27   * comet client ID using the {@link #associate(String, Client)} and
28   * {@link #disassociate(String)} methods. Each cometd node keeps its
29   * own associate mapping for clients connected to it.
30   * <p>
31   * The {@link #sendMessage(Collection, String, Object)} and 
32   * {@link #sendMessage(String, String, Object)} methods may be
33   * used to send a message to user(s) anywhere in the Oort cluster
34   * and Seti organizes the search of the distributed associate
35   * maps in order to locate the user(s)
36   * <p>
37   * If users can be directed to shards of cometd servers, then
38   * each Seti instance must be told it's shard ID and the {@link #userId2Shard(String)}
39   * method must be extended to map users to shards.
40   * 
41   */
42  public class Seti extends AbstractLifeCycle
43  {
44      public final static String SETI_ATTRIBUTE="org.cometd.oort.Seti";
45      public final static String SETI_SHARD="seti.shard";
46      
47      final String _setiId;
48      final String _setiChannelId;
49      final String _shardId;
50      final Oort _oort;
51      final Client _client;
52      final ShardLocation _allShardLocation;
53      final Channel _setiIdChannel;
54      final Channel _setiAllChannel;
55      final Channel _setiShardChannel;
56      
57      final ConcurrentMap<String, Location> _uid2Location = new ConcurrentHashMap<String, Location>();
58      
59      /* ------------------------------------------------------------ */
60      public Seti(Oort oort, String shardId)
61      {
62          _oort=oort;
63          _client = _oort.getBayeux().newClient("seti");
64          _setiId=_oort.getURL().replace("://","_").replace("/","_").replace(":","_");
65          _shardId=shardId;
66  
67          _setiChannelId="/seti/"+_setiId;
68          _setiIdChannel=_oort.getBayeux().getChannel(_setiChannelId,true);
69          _setiIdChannel.setPersistent(true);
70          _oort.observeChannel(_setiIdChannel.getId());
71          _setiIdChannel.subscribe(_client);
72          
73          _setiAllChannel=_oort.getBayeux().getChannel("/seti/ALL",true);
74          _setiAllChannel.setPersistent(true);
75          _oort.observeChannel(_setiAllChannel.getId());
76          _setiAllChannel.subscribe(_client);
77          
78          _setiShardChannel=_oort.getBayeux().getChannel("/seti/"+shardId,true);
79          _setiShardChannel.setPersistent(true);
80          _oort.observeChannel(_setiShardChannel.getId());
81          _setiShardChannel.subscribe(_client);
82          
83          _allShardLocation = new ShardLocation(_setiAllChannel);
84           
85      }
86  
87      /* ------------------------------------------------------------ */
88      protected void doStart()
89          throws Exception
90      {
91          super.doStart();
92          _client.addListener(new MessageListener()
93          {
94              public void deliver(Client from, Client to, Message msg)
95              {
96                  receive(from,to,msg);
97              }
98          });
99      }
100     
101     /* ------------------------------------------------------------ */
102     protected void doStop()
103         throws Exception
104     {
105         _client.disconnect();
106     }
107     
108     /* ------------------------------------------------------------ */
109     public void associate(final String userId,final Client client)
110     {
111         _uid2Location.put(userId,new LocalLocation(client));
112         userId2Shard(userId).associate(userId);
113     }
114 
115     /* ------------------------------------------------------------ */
116     public void disassociate(final String userId)
117     {
118         _uid2Location.remove(userId);
119         userId2Shard(userId).disassociate(userId);
120     }
121 
122     /* ------------------------------------------------------------ */
123     public void sendMessage(final String toUser,final String toChannel,final Object message)
124     {
125         Location location = _uid2Location.get(toUser);
126         if (location==null)
127             location = userId2Shard(toUser);
128         
129         location.sendMessage(toUser,toChannel,message);
130     }
131 
132     /* ------------------------------------------------------------ */
133     public void sendMessage(final Collection<String> toUsers,final String toChannel,final Object message)
134     {
135         // break toUsers in to shards
136         MultiMap shard2users = new MultiMap();
137         for (String userId:toUsers)
138         {       
139             ShardLocation shard = userId2Shard(userId);
140             shard2users.add(shard,userId);
141         }
142         
143         // for each shard
144         for (Map.Entry<ShardLocation,Object> entry : (Set<Map.Entry<ShardLocation,Object>>)shard2users.entrySet())
145         {
146             // TODO, we could look at all users in shard to see if we
147             // know a setiId for each, and if so, break the user list
148             // up into a message for each seti-id. BUT it is probably 
149             // more efficient just to send to the entire shard (unless
150             // the number of nodes in the shard is greater than the
151             // number of users).
152             
153             ShardLocation shard = entry.getKey();
154             Object lazyUsers = entry.getValue();
155             
156             if (LazyList.size(lazyUsers)==1)
157                 shard.sendMessage((String)lazyUsers,toChannel,message);
158             else
159                 shard.sendMessage((List<String>)lazyUsers,toChannel,message);
160         }
161     }
162     
163     /* ------------------------------------------------------------ */
164     protected ShardLocation userId2Shard(final String userId)
165     {
166         return _allShardLocation;
167     }
168 
169     /* ------------------------------------------------------------ */
170     protected void receive(final Client from, final Client to, final Message msg)
171     {
172         //System.err.println("SETI "+_oort+":: "+msg);
173 
174         if (!(msg.getData() instanceof Map))
175             return;
176         
177         // extract the message details
178         Map<String,Object> data = (Map<String,Object>)msg.getData();
179         final String toUid=(String)data.get("to");
180         final String fromUid=(String)data.get("from");
181         final Object message = data.get("message");
182         final String on = (String)data.get("on");
183         
184         // Handle any client locations contained in the message
185         if (fromUid!=null)
186         {
187             if (on!=null)
188             {
189                 //System.err.println(_oort+":: "+fromUid+" on "+on);
190                 _uid2Location.put(fromUid,new SetiLocation("/seti/"+on));
191             }
192             else 
193             {
194                 final String off = (String)data.get("off");
195                 if (off!=null)
196                 {
197                     //System.err.println(_oort+":: "+fromUid+" off ");
198                     _uid2Location.remove(fromUid,new SetiLocation("/seti/"+off));
199                 }
200             }
201         }
202         
203         // deliver message
204         if (message!=null && toUid!=null)
205         {
206             final String toChannel=(String)data.get("channel");
207             Location location=_uid2Location.get(toUid);
208             
209             if (location==null && _setiChannelId.equals(msg.getChannel()))
210                 // was sent to this node, so escalate to the shard.
211                 location =userId2Shard(toUid);
212             
213             if (location!=null)
214                 location.receive(toUid,toChannel,message);
215         }
216         
217     }
218 
219 
220     /* ------------------------------------------------------------ */
221     /* ------------------------------------------------------------ */
222     private interface Location
223     {
224         public void sendMessage(String toUser,String toChannel,Object message);
225         public void receive(String toUser,String toChannel,Object message);
226     }
227     
228 
229     /* ------------------------------------------------------------ */
230     /* ------------------------------------------------------------ */
231     class LocalLocation implements Location
232     {
233         Client _client;
234         
235         LocalLocation(Client client)
236         {
237             _client=client;
238         }
239 
240         public void sendMessage(String toUser, String toChannel, Object message)
241         {
242             _client.deliver(Seti.this._client,toChannel,message,null);
243         }
244 
245         public void receive(String toUser, String toChannel, Object message)
246         {
247             _client.deliver(Seti.this._client,toChannel,message,null);
248         }
249     }
250 
251     /* ------------------------------------------------------------ */
252     /* ------------------------------------------------------------ */
253     class SetiLocation implements Location
254     {
255         Channel _channel;
256 
257         SetiLocation(String channelId)
258         {
259             _channel=_oort._bayeux.getChannel(channelId,true);
260         }
261         
262         SetiLocation(Channel channel)
263         {
264             _channel=channel;
265         }
266         
267         public void sendMessage(String toUser, String toChannel, Object message)
268         {
269             _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
270         }
271 
272         public void receive(String toUser, String toChannel, Object message)
273         {
274             
275         }
276 
277         public boolean equals(Object o)
278         {
279             return o instanceof SetiLocation &&
280             ((SetiLocation)o)._channel.equals(_channel);
281         }
282         
283         public int hashCode()
284         {
285             return _channel.hashCode();
286         }
287     }
288 
289     /* ------------------------------------------------------------ */
290     /* ------------------------------------------------------------ */
291     class ShardLocation implements Location
292     {
293         Channel _channel;
294         
295         ShardLocation(String shardId)
296         {
297             _channel=_oort._bayeux.getChannel("/seti/"+shardId,true);
298             
299         }
300         
301         ShardLocation(Channel channel)
302         {
303             _channel=channel;
304         }
305         
306         public void sendMessage(final Collection<String> toUsers, final String toChannel, final Object message)
307         {
308             _channel.publish(Seti.this._client,new SetiMessage(toUsers,toChannel,message),null);
309         }
310 
311         public void sendMessage(String toUser, String toChannel, Object message)
312         {
313             _channel.publish(Seti.this._client,new SetiMessage(toUser,toChannel,message),null);
314         }
315         
316         public void receive(String toUser, String toChannel, Object message)
317         {
318             
319         }
320         
321         public void associate(final String user)
322         {
323             _channel.publish(Seti.this._client,new SetiPresence(user,true),null);
324         }
325         
326         public void disassociate(final String user)
327         {
328             _channel.publish(Seti.this._client,new SetiPresence(user,false),null);
329         }
330     }
331 
332     /* ------------------------------------------------------------ */
333     /* ------------------------------------------------------------ */
334     class SetiMessage implements JSON.Convertible
335     {
336         String _toUser;
337         Collection<String> _toUsers;
338         String _toChannel;
339         Object _message;
340 
341         SetiMessage(String toUser,String toChannel, Object message)
342         {
343             _toUser=toUser;
344             _toChannel=toChannel;
345             _message=message;
346         }
347         
348         SetiMessage(Collection<String> toUsers,String toChannel, Object message)
349         {
350             _toUsers=toUsers;
351             _toChannel=toChannel;
352             _message=message;
353         }
354         
355         public void fromJSON(Map object)
356         {
357             throw new UnsupportedOperationException();
358         }
359 
360         public void toJSON(Output out)
361         {
362             if (_toUser!=null)
363                 out.add("to",_toUser);
364             else if (_toUsers!=null)
365                 out.add("to",_toUsers);
366             out.add("channel",_toChannel);
367             out.add("from",_setiId);
368             out.add("message",_message);
369         }   
370     }
371     
372     /* ------------------------------------------------------------ */
373     /* ------------------------------------------------------------ */
374     class SetiPresence implements JSON.Convertible
375     {
376         String _user;
377         boolean _on;
378 
379         SetiPresence(String user,boolean on)
380         {
381             _user=user;
382             _on=on;
383         }
384         
385         public void fromJSON(Map object)
386         {
387             throw new UnsupportedOperationException();
388         }
389 
390         public void toJSON(Output out)
391         {
392             out.add("from",_user);
393             out.add(_on?"on":"off",_setiId);
394         }
395     }
396     
397 }