View Javadoc

1   package org.cometd.oort;
2   
3   import java.security.SecureRandom;
4   import java.util.ArrayList;
5   import java.util.HashMap;
6   import java.util.HashSet;
7   import java.util.List;
8   import java.util.Map;
9   import java.util.Random;
10  import java.util.Set;
11  import java.util.Timer;
12  
13  import org.cometd.Bayeux;
14  import org.cometd.Client;
15  import org.cometd.Extension;
16  import org.cometd.Message;
17  import org.cometd.MessageListener;
18  import org.cometd.RemoveListener;
19  import org.mortbay.component.AbstractLifeCycle;
20  import org.mortbay.jetty.client.HttpClient;
21  import org.mortbay.log.Log;
22  import org.mortbay.util.ajax.JSON;
23  
24  /* ------------------------------------------------------------ */
25  /**
26   * Oort cluster of cometd servers.
27   * <p>
28   * This class maintains a collection of {@link OortComet} instances to each
29   * comet server identified by calls to {@link #observeComet(String)}. The Oort
30   * instance is created and configured by {@link OortServlet}. 
31   * <p>
32   * The key configuration parameter that must be set is the Oort URL, which is 
33   * full public URL to the cometd servlet, eg. http://myserver:8080/context/cometd
34   * See {@link OortServlet} for more configuration detail.<p>
35   * @author gregw
36   *
37   */
38  public class Oort extends AbstractLifeCycle
39  {
40      public final static String OORT_URL = "oort.url";
41      public final static String OORT_CLOUD = "oort.cloud";
42      public final static String OORT_CHANNELS = "oort.channels";
43      public final static String OORT_ATTRIBUTE = "org.cometd.oort.Oort";
44      
45      protected String _url;
46      protected String _secret;
47      protected Bayeux _bayeux;
48      protected HttpClient _httpClient=new HttpClient();
49      protected Timer _timer=new Timer();
50      protected Random _random=new SecureRandom();
51      protected Client _oortClient;
52      protected List<MessageListener> _oortMessageListeners = new ArrayList<MessageListener>();
53      
54      protected Map<String,OortComet> _knownCommets = new HashMap<String,OortComet>();
55      protected Set<String> _channels = new HashSet<String>();
56  
57      /* ------------------------------------------------------------ */
58      Oort(String id,Bayeux bayeux)
59      {
60          _url=id;
61          _bayeux=bayeux;
62          _secret=Long.toHexString(_random.nextLong());
63          
64          _oortClient=_bayeux.newClient("oort");
65          _oortClient.addListener(new RootOortClientListener());
66          _bayeux.getChannel("/oort/cloud",true).subscribe(_oortClient);
67          bayeux.addExtension(new OortExtension());
68      }
69  
70      /* ------------------------------------------------------------ */
71      public Bayeux getBayeux()
72      {
73          return _bayeux;
74      }
75      
76      /* ------------------------------------------------------------ */
77      /**
78       * @return The oublic absolute URL of the Oort cometd server.
79       */
80      public String getURL()
81      {
82          return _url;
83      }
84  
85      /* ------------------------------------------------------------ */
86      public String getSecret()
87      {
88          return _secret;
89      }
90  
91      /* ------------------------------------------------------------ */
92      protected void doStart() throws Exception
93      {
94          super.doStart();
95          _httpClient.start();
96      }
97  
98      /* ------------------------------------------------------------ */
99      /**
100      * Observe an Oort Comet server.
101      * <p>
102      * The the comet server is not already observed, start a {@link OortComet} 
103      * instance for it.
104      * 
105      * @param cometUrl
106      * @return The {@link OortComet} instance for the comet server.
107      */
108     public OortComet observeComet(String cometUrl)
109     {
110         synchronized (this)
111         {
112             if (_url.equals(cometUrl))
113                 return null;
114             OortComet comet = _knownCommets.get(cometUrl);
115             if (comet==null)
116             {
117                 try
118                 {
119                     comet = new OortComet(this,cometUrl);
120                     _knownCommets.put(cometUrl,comet);
121                     comet.start();
122                 }
123                 catch(Exception e)
124                 {
125                     throw new IllegalStateException(e);
126                 }
127             }
128             return comet;
129         }
130     }
131 
132     /* ------------------------------------------------------------ */
133     /**
134      * Pass observed comets. 
135      * <p>
136      * Called when another comet server publishes it's list of 
137      * known comets to the /oort/cloud channel.  If the list contains
138      * any unknown commets, then {@link #observeComet(String)} is 
139      * called for each.
140      * @param comets
141      */
142     void observedComets(Set<String> comets)
143     {
144         synchronized (this)
145         {
146             Set<String> known=getKnownComets();
147             for (String comet : comets)
148                 if (!_url.equals(comet))
149                     observeComet(comet);
150             known=getKnownComets();
151             
152             if (!comets.containsAll(known))
153                 _bayeux.getChannel("/oort/cloud",true).publish(_oortClient,known,null);
154         }
155     }
156 
157     /* ------------------------------------------------------------ */
158     /**
159      * @return The set of known Oort comet servers URLs.
160      */
161     public Set<String> getKnownComets()
162     {
163         synchronized (this)
164         {
165             Set<String> comets = new HashSet<String>(_knownCommets.keySet());
166             comets.add(_url);
167             return comets;
168         }
169     }
170 
171     /* ------------------------------------------------------------ */
172     /**
173      * Observer a channel.
174      * <p>
175      * Once observed, all {@link OortComet} instances subscribe
176      * to the channel and will repeat any messages published to
177      * the local channel (with loop prevention), so that the
178      * messages are distributed to all Oort comet servers.
179      * @param channelId
180      */
181     public void observeChannel(String channelId)
182     {
183         synchronized (this)
184         {
185             if (!_channels.contains(channelId))
186             {
187                 _channels.add(channelId);
188                 for (OortComet comet : _knownCommets.values())
189                     if (comet.isHandshook())
190                         comet.subscribe(channelId);
191             }
192         }
193     }
194 
195     /* ------------------------------------------------------------ */
196     /**
197      * Add a MessageListener that will receive all messages 
198      * published on /oort/* channels on connected OortComets 
199      * @param listener
200      */
201     public void addOortMessageListener(MessageListener listener)
202     {
203         synchronized (this)
204         {
205             _oortMessageListeners.add(listener);
206         }
207     }
208 
209     /* ------------------------------------------------------------ */
210     /**
211      * Remove an Oort message listener.
212      * @param listener
213      * @return true if the listener was removed.
214      */
215     public boolean removeOortClientListener(MessageListener listener)
216     {
217         synchronized (this)
218         {
219             return _oortMessageListeners.remove(listener);
220         }
221     }
222 
223     /* ------------------------------------------------------------ */
224     public boolean isOort(Client client)
225     {
226         return client==_oortClient;
227     }
228 
229     /* ------------------------------------------------------------ */
230     public String toString()
231     {
232         return _url;
233     }
234 
235     /* ------------------------------------------------------------ */
236     /**
237      * Called to register the details of a successful handshake with an
238      * Oort comet.  A {@link RemoteOortClientListener} instance is added to
239      * the local Oort client instance.
240      * @param oortUrl
241      * @param oortSecret
242      * @param clientId
243      */
244     protected void oortHandshook(String oortUrl,String oortSecret,String clientId)
245     {
246         Log.info(this+": "+clientId+" is oort "+oortUrl);
247         if (!_knownCommets.containsKey(oortUrl))
248             observeComet(oortUrl);
249         
250         Client client = _bayeux.getClient(clientId);
251         
252         client.addExtension(new RemoteOortClientExtension());
253     }
254 
255     /* ------------------------------------------------------------ */
256     /* ------------------------------------------------------------ */
257     /**
258      * Extension to detect incoming handshake from other Oort servers
259      * and to call {@link Oort#oortHandshook(String, String, String)}.
260      *
261      */
262     protected class OortExtension implements Extension
263     {
264         public Message rcv(Client from, Message message)
265         {
266             return message;
267         }
268 
269         public Message rcvMeta(Client from, Message message)
270         {
271             return message;
272         }
273 
274         public Message send(Client from, Message message)
275         {
276             return message;
277         }
278 
279         public Message sendMeta(Client from, Message message)
280         {
281             if (message.getChannel().equals(Bayeux.META_HANDSHAKE) && Boolean.TRUE.equals(message.get(Bayeux.SUCCESSFUL_FIELD)))
282             {
283                 Message rcv = message.getAssociated();
284                 System.err.println(_url+" --> "+rcv);
285                 
286                 Map<String,Object> rcvExt = (Map<String,Object>)rcv.get("ext");
287                 if (rcvExt!=null)
288                 {
289                     Map<String,Object> oort = (Map<String,Object>)rcvExt.get("oort");
290                     if (oort!=null)
291                     {
292                         String cometUrl = (String)oort.get("comet");
293                         String oortUrl = (String)oort.get("oort");
294 
295                         if (getURL().equals(cometUrl))
296                         {
297                             String oortSecret = (String)oort.get("oortSecret");
298                             
299                             oortHandshook(oortUrl,oortSecret,message.getClientId());
300                             
301                             Object ext=message.get("ext");
302                             
303                             Map<String,Object> sndExt = (Map<String,Object>)((ext instanceof JSON.Literal)?JSON.parse(ext.toString()):ext);
304                             if (sndExt==null)
305                                 sndExt = new HashMap<String,Object>();
306                             oort.put("cometSecret",getSecret());
307                             sndExt.put("oort",oort);
308                             message.put("ext",sndExt);
309                         }
310                     }
311                 }
312                 System.err.println(_url+" <-- "+message);
313             }
314             return message;
315         }   
316     }
317 
318     /* ------------------------------------------------------------ */
319     /* ------------------------------------------------------------ */
320     /**
321      * An Extension installed on clients for remote Oort servers
322      * that prevents publish loops.
323      */
324     protected class RemoteOortClientExtension implements Extension
325     {
326         public boolean queueMaxed(Client from, Client client, Message message)
327         {
328             // avoid loops
329             boolean send = from!=_oortClient || message.getChannel().startsWith("/oort/");
330             return send;
331         }
332 
333         public Message rcv(Client from, Message message)
334         {
335             return message;
336         }
337 
338         public Message rcvMeta(Client from, Message message)
339         {
340             return message;
341         }
342 
343         public Message send(Client from, Message message)
344         {
345             // avoid loops
346             boolean send = !isOort(from) || message.getChannel().startsWith("/oort/");
347             return send?message:null;
348         }
349 
350         public Message sendMeta(Client from, Message message)
351         {
352             return message;
353         }
354     }
355 
356     /* ------------------------------------------------------------ */
357     /* ------------------------------------------------------------ */
358     /**
359      * MessageListener that handles publishes to /oort/cloud
360      */
361     protected class RootOortClientListener implements RemoveListener, MessageListener
362     {
363         public void removed(String clientId, boolean timeout)
364         {
365             // TODO
366         }
367 
368         public void deliver(Client fromClient, Client toClient, Message msg)
369         {
370             String channelId = msg.getChannel();
371             if (msg.getData()!=null)
372             {
373                 if (channelId.equals("/oort/cloud") && msg.getData() instanceof Object[])
374                 {
375                     Object[] data = (Object[])msg.getData();
376                     Set<String> comets = new HashSet<String>();
377                     for (Object o:data)
378                         comets.add(o.toString());
379                     observedComets(comets);
380                 }   
381             }
382         }
383         
384     }
385 }