View Javadoc

1   package org.cometd.oort;
2   
3   import java.io.IOException;
4   import java.util.HashMap;
5   import java.util.HashSet;
6   import java.util.Map;
7   import java.util.Set;
8   
9   import org.cometd.Channel;
10  import org.cometd.Client;
11  import org.cometd.Message;
12  import org.cometd.MessageListener;
13  import org.mortbay.cometd.MessageImpl;
14  import org.mortbay.cometd.client.BayeuxClient;
15  import org.mortbay.log.Log;
16  
17  /**
18   * Oort Comet client.
19   * <p>
20   * A BayeuxClient that connects the local Oort comet server to
21   * a remote Oort comet server.
22   *
23   */
24  public class OortComet extends BayeuxClient
25  {
26      protected Oort _oort;
27      protected String _cometUrl;
28      protected String _cometSecret;
29      protected boolean _connected;
30      protected boolean _handshook;
31  
32      OortComet(Oort oort,String cometUrl)
33      {
34          super(oort._httpClient,cometUrl,oort._timer);
35          _cometUrl=cometUrl;
36          _oort=oort;
37          addListener(new OortCometListener());
38      }
39  
40      public boolean isConnected()
41      {
42          return _connected;
43      }
44  
45      public boolean isHandshook()
46      {
47          return _handshook;
48      }
49  
50      @Override
51      protected String extendOut(String message)
52      {
53          if (message==BayeuxClient.Handshake.__HANDSHAKE)
54          {
55              try
56              {
57                  Message[] msg = _msgPool.parse(message);
58  
59                  Map<String,Object> oort = new HashMap<String,Object>();
60                  oort.put("oort",_oort.getURL());
61                  oort.put("oortSecret",_oort.getSecret());
62                  oort.put("comet",_cometUrl);
63                  Map<String,Object> ext = msg[0].getExt(true);
64                  ext.put("oort",oort);
65  
66                  super.extendOut(msg[0]);
67                  message= _msgPool.getJSON().toJSON(msg);
68  
69                  for (Message m:msg)
70                      if (m instanceof MessageImpl)
71                          ((MessageImpl)m).decRef();
72  
73              }
74              catch (IOException e)
75              {
76                  throw new IllegalArgumentException(e);
77              }
78  
79          }
80          else
81              message=super.extendOut(message);
82  
83          System.err.println(_oort.getURL()+" ==> "+message);
84          return message;
85      }
86  
87      @Override
88      protected void metaConnect(boolean success, Message message)
89      {
90          _connected=success;
91          super.metaConnect(success,message);
92      }
93  
94      @Override
95      protected void metaHandshake(boolean success, boolean reestablish, Message message)
96      {
97          synchronized (_oort)
98          {
99              _handshook=success;
100             super.metaHandshake(success,reestablish,message);
101             if (success)
102             {
103                 Map<String,Object> ext = (Map<String,Object>)message.get("ext");
104                 if (ext!=null)
105                 {
106                     Map<String,Object> oort = (Map<String,Object>)ext.get("oort");
107                     if (oort!=null)
108                     {
109                         _cometSecret=(String)oort.get("cometSecret");
110 
111                         startBatch();
112                         subscribe("/oort/cloud");
113                         for (String channel : _oort._channels)
114                             subscribe(channel);
115                         publish("/oort/cloud",_oort.getKnownComets(),_cometSecret);
116                         endBatch();
117                     }
118                 }
119                 System.err.println(_oort.getURL()+" <== "+ext);
120             }
121         }
122     }
123 
124     @Override
125     protected void metaPublishFail(Throwable e, Message[] messages)
126     {
127         // TODO Auto-generated method stub
128         super.metaPublishFail(e,messages);
129     }
130 
131 
132     protected class OortCometListener implements MessageListener
133     {
134         public void deliver(Client fromClient, Client toClient, Message msg)
135         {
136             String channelId = msg.getChannel();
137             if (msg.getData()!=null)
138             {
139                 if (channelId.startsWith("/oort/"))
140                 {
141                     if (channelId.equals("/oort/cloud"))
142                     {
143                         Object[] data = (Object[])msg.getData();
144                         Set<String> comets = new HashSet<String>();
145                         for (Object o:data)
146                             comets.add(o.toString());
147                         _oort.observedComets(comets);
148                     }
149 
150                     synchronized (_oort)
151                     {
152                         for( MessageListener listener : _oort._oortMessageListeners)
153                             notifyMessageListener(listener, fromClient, toClient, msg);
154                     }
155                 }
156                 else
157                 {
158                     Channel channel = _oort._bayeux.getChannel(msg.getChannel(),false);
159                     if (channel!=null)
160                         channel.publish(_oort._oortClient,msg.getData(),msg.getId());
161                 }
162             }
163         }
164     }
165 
166     private void notifyMessageListener(MessageListener listener, Client from, Client to, Message message)
167     {
168         try
169         {
170             listener.deliver(from, to, message);
171         }
172         catch (Throwable x)
173         {
174             Log.debug(x);
175         }
176     }
177 }