1 package org.mortbay.cometd;
2
3 import java.io.IOException;
4 import java.io.Reader;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8
9 import org.cometd.Bayeux;
10 import org.cometd.Message;
11 import org.mortbay.util.ArrayQueue;
12 import org.mortbay.util.StringMap;
13 import org.mortbay.util.ajax.JSON;
14
15 public class MessagePool
16 {
17 final private ArrayQueue<MessageImpl> _messagePool;
18 final private ArrayQueue<JSON.ReaderSource> _readerPool;
19
20
21 public MessagePool()
22 {
23 this(50);
24 }
25
26
27 public MessagePool(int capacity)
28 {
29 _messagePool=new ArrayQueue<MessageImpl>(capacity,capacity);
30 _readerPool=new ArrayQueue<JSON.ReaderSource>(capacity,capacity);
31 }
32
33
34
35
36
37 public JSON getJSON()
38 {
39 return _json;
40 }
41
42
43
44
45
46
47 public void setJSON(JSON json)
48 {
49 _json=json;
50 }
51
52
53
54
55
56
57 public JSON getMsgJSON()
58 {
59 return _msgJSON;
60 }
61
62
63
64
65
66
67
68 public void setMsgJSON(JSON msgJSON)
69 {
70 _msgJSON=msgJSON;
71 }
72
73
74
75
76
77
78 public JSON getBatchJSON()
79 {
80 return _batchJSON;
81 }
82
83
84
85
86
87
88
89 public void setBatchJSON(JSON batchJSON)
90 {
91 _batchJSON=batchJSON;
92 }
93
94
95 public MessageImpl newMessage()
96 {
97 MessageImpl message=_messagePool.poll();
98 if (message == null)
99 {
100 message=new MessageImpl(this);
101 }
102 message.incRef();
103 return message;
104 }
105
106
107 public MessageImpl newMessage(Message associated)
108 {
109 MessageImpl message=_messagePool.poll();
110 if (message == null)
111 {
112 message=new MessageImpl(this);
113 }
114 message.incRef();
115 if (associated != null)
116 message.setAssociated(associated);
117 return message;
118 }
119
120
121 void recycleMessage(MessageImpl message)
122 {
123 message.clear();
124 _messagePool.offer(message);
125 }
126
127
128 public Message[] parse(Reader reader) throws IOException
129 {
130 JSON.ReaderSource source=_readerPool.poll();
131 if (source == null)
132 source=new JSON.ReaderSource(reader);
133 else
134 source.setReader(reader);
135
136 Object batch=_batchJSON.parse(source);
137 _readerPool.offer(source);
138
139 if (batch == null)
140 return new Message[0];
141 if (batch.getClass().isArray())
142 return (Message[])batch;
143 return new Message[]
144 {(Message)batch};
145 }
146
147
148 public Message[] parse(String s) throws IOException
149 {
150 Object batch=_batchJSON.parse(new JSON.StringSource(s));
151 if (batch == null)
152 return new Message[0];
153 if (batch.getClass().isArray())
154 return (Message[])batch;
155 return new Message[]
156 {(Message)batch};
157 }
158
159
160 public void parseTo(String fodder, List<Message> messages)
161 {
162 Object batch=_batchJSON.parse(new JSON.StringSource(fodder));
163 if (batch == null)
164 return;
165 if (batch.getClass().isArray())
166 {
167 Message[] msgs=(Message[])batch;
168 for (int m=0; m < msgs.length; m++)
169 messages.add(msgs[m]);
170 }
171 else
172 messages.add((Message)batch);
173 }
174
175
176 public String toString()
177 {
178 return "MessagePool:" + _messagePool.size() + "/" + _messagePool.getCapacity();
179
180 }
181
182
183
184 private StringMap _fieldStrings=new StringMap();
185 private StringMap _valueStrings=new StringMap();
186 {
187 _fieldStrings.put(Bayeux.ADVICE_FIELD,Bayeux.ADVICE_FIELD);
188 _fieldStrings.put(Bayeux.CHANNEL_FIELD,Bayeux.CHANNEL_FIELD);
189 _fieldStrings.put(Bayeux.CLIENT_FIELD,Bayeux.CLIENT_FIELD);
190 _fieldStrings.put(Bayeux.DATA_FIELD,Bayeux.DATA_FIELD);
191 _fieldStrings.put(Bayeux.ERROR_FIELD,Bayeux.ERROR_FIELD);
192 _fieldStrings.put(Bayeux.EXT_FIELD,Bayeux.EXT_FIELD);
193 _fieldStrings.put(Bayeux.ID_FIELD,Bayeux.ID_FIELD);
194 _fieldStrings.put(Bayeux.SUBSCRIPTION_FIELD,Bayeux.SUBSCRIPTION_FIELD);
195 _fieldStrings.put(Bayeux.SUCCESSFUL_FIELD,Bayeux.SUCCESSFUL_FIELD);
196 _fieldStrings.put(Bayeux.TIMESTAMP_FIELD,Bayeux.TIMESTAMP_FIELD);
197 _fieldStrings.put(Bayeux.TRANSPORT_FIELD,Bayeux.TRANSPORT_FIELD);
198 _fieldStrings.put("connectionType","connectionType");
199
200 _valueStrings.put(Bayeux.META_CLIENT,Bayeux.META_CLIENT);
201 _valueStrings.put(Bayeux.META_CONNECT,Bayeux.META_CONNECT);
202 _valueStrings.put(Bayeux.META_DISCONNECT,Bayeux.META_DISCONNECT);
203 _valueStrings.put(Bayeux.META_HANDSHAKE,Bayeux.META_HANDSHAKE);
204 _valueStrings.put(Bayeux.META_SUBSCRIBE,Bayeux.META_SUBSCRIBE);
205 _valueStrings.put(Bayeux.META_UNSUBSCRIBE,Bayeux.META_UNSUBSCRIBE);
206 _valueStrings.put("long-polling","long-polling");
207 }
208
209
210
211 private JSON _json=new JSON()
212 {
213 @Override
214 protected Map newMap()
215 {
216 return new HashMap(8);
217 }
218
219 @Override
220 protected String toString(char[] buffer, int offset, int length)
221 {
222 Map.Entry entry=_valueStrings.getEntry(buffer,offset,length);
223 if (entry != null)
224 return (String)entry.getValue();
225 String s=new String(buffer,offset,length);
226 return s;
227 }
228 };
229
230
231
232 private JSON _msgJSON=new JSON()
233 {
234 @Override
235 protected Map newMap()
236 {
237 return newMessage();
238 }
239
240 @Override
241 protected String toString(char[] buffer, int offset, int length)
242 {
243 Map.Entry entry=_fieldStrings.getEntry(buffer,offset,length);
244 if (entry != null)
245 return (String)entry.getValue();
246
247 String s=new String(buffer,offset,length);
248 return s;
249 }
250
251 @Override
252 protected JSON contextFor(String field)
253 {
254 return _json;
255 }
256 };
257
258
259
260 private JSON _batchJSON=new JSON()
261 {
262 @Override
263 protected Map newMap()
264 {
265 return newMessage();
266 }
267
268 @Override
269 protected Object[] newArray(int size)
270 {
271 return new Message[size];
272 }
273
274 @Override
275 protected JSON contextFor(String field)
276 {
277 return _json;
278 }
279
280 @Override
281 protected JSON contextForArray()
282 {
283 return _msgJSON;
284 }
285 };
286
287 }