1 package org.cometd.oort;
2
3 import java.io.IOException;
4 import java.util.HashMap;
5 import java.util.HashSet;
6 import java.util.Map;
7 import java.util.Set;
8
9 import org.cometd.Channel;
10 import org.cometd.Client;
11 import org.cometd.Message;
12 import org.cometd.MessageListener;
13 import org.mortbay.cometd.MessageImpl;
14 import org.mortbay.cometd.client.BayeuxClient;
15 import org.mortbay.log.Log;
16
17
18
19
20
21
22
23
24 public class OortComet extends BayeuxClient
25 {
26 protected Oort _oort;
27 protected String _cometUrl;
28 protected String _cometSecret;
29 protected boolean _connected;
30 protected boolean _handshook;
31
32 OortComet(Oort oort,String cometUrl)
33 {
34 super(oort._httpClient,cometUrl,oort._timer);
35 _cometUrl=cometUrl;
36 _oort=oort;
37 addListener(new OortCometListener());
38 }
39
40 public boolean isConnected()
41 {
42 return _connected;
43 }
44
45 public boolean isHandshook()
46 {
47 return _handshook;
48 }
49
50 @Override
51 protected String extendOut(String message)
52 {
53 if (message==BayeuxClient.Handshake.__HANDSHAKE)
54 {
55 try
56 {
57 Message[] msg = _msgPool.parse(message);
58
59 Map<String,Object> oort = new HashMap<String,Object>();
60 oort.put("oort",_oort.getURL());
61 oort.put("oortSecret",_oort.getSecret());
62 oort.put("comet",_cometUrl);
63 Map<String,Object> ext = msg[0].getExt(true);
64 ext.put("oort",oort);
65
66 super.extendOut(msg[0]);
67 message= _msgPool.getJSON().toJSON(msg);
68
69 for (Message m:msg)
70 if (m instanceof MessageImpl)
71 ((MessageImpl)m).decRef();
72
73 }
74 catch (IOException e)
75 {
76 throw new IllegalArgumentException(e);
77 }
78
79 }
80 else
81 message=super.extendOut(message);
82
83 System.err.println(_oort.getURL()+" ==> "+message);
84 return message;
85 }
86
87 @Override
88 protected void metaConnect(boolean success, Message message)
89 {
90 _connected=success;
91 super.metaConnect(success,message);
92 }
93
94 @Override
95 protected void metaHandshake(boolean success, boolean reestablish, Message message)
96 {
97 synchronized (_oort)
98 {
99 _handshook=success;
100 super.metaHandshake(success,reestablish,message);
101 if (success)
102 {
103 Map<String,Object> ext = (Map<String,Object>)message.get("ext");
104 if (ext!=null)
105 {
106 Map<String,Object> oort = (Map<String,Object>)ext.get("oort");
107 if (oort!=null)
108 {
109 _cometSecret=(String)oort.get("cometSecret");
110
111 startBatch();
112 subscribe("/oort/cloud");
113 for (String channel : _oort._channels)
114 subscribe(channel);
115 publish("/oort/cloud",_oort.getKnownComets(),_cometSecret);
116 endBatch();
117 }
118 }
119 System.err.println(_oort.getURL()+" <== "+ext);
120 }
121 }
122 }
123
124 @Override
125 protected void metaPublishFail(Throwable e, Message[] messages)
126 {
127
128 super.metaPublishFail(e,messages);
129 }
130
131
132 protected class OortCometListener implements MessageListener
133 {
134 public void deliver(Client fromClient, Client toClient, Message msg)
135 {
136 String channelId = msg.getChannel();
137 if (msg.getData()!=null)
138 {
139 if (channelId.startsWith("/oort/"))
140 {
141 if (channelId.equals("/oort/cloud"))
142 {
143 Object[] data = (Object[])msg.getData();
144 Set<String> comets = new HashSet<String>();
145 for (Object o:data)
146 comets.add(o.toString());
147 _oort.observedComets(comets);
148 }
149
150 synchronized (_oort)
151 {
152 for( MessageListener listener : _oort._oortMessageListeners)
153 notifyMessageListener(listener, fromClient, toClient, msg);
154 }
155 }
156 else
157 {
158 Channel channel = _oort._bayeux.getChannel(msg.getChannel(),false);
159 if (channel!=null)
160 channel.publish(_oort._oortClient,msg.getData(),msg.getId());
161 }
162 }
163 }
164 }
165
166 private void notifyMessageListener(MessageListener listener, Client from, Client to, Message message)
167 {
168 try
169 {
170 listener.deliver(from, to, message);
171 }
172 catch (Throwable x)
173 {
174 Log.debug(x);
175 }
176 }
177 }