1 package org.mortbay.cometd;
2
3 import java.io.IOException;
4 import java.io.Reader;
5 import java.util.HashSet;
6 import java.util.List;
7 import java.util.Map;
8
9 import org.cometd.Bayeux;
10 import org.cometd.Message;
11 import org.mortbay.util.ArrayQueue;
12 import org.mortbay.util.StringMap;
13 import org.mortbay.util.ajax.JSON;
14
15
16 public class MessagePool
17 {
18 final private ArrayQueue<MessageImpl> _messagePool;
19 final private ArrayQueue<JSON.ReaderSource> _readerPool;
20
21
22 public MessagePool()
23 {
24 this(50);
25 }
26
27
28 public MessagePool(int capacity)
29 {
30 _messagePool=new ArrayQueue<MessageImpl>(capacity);
31 _readerPool=new ArrayQueue<JSON.ReaderSource>(capacity);
32 }
33
34
35
36
37
38 public JSON getJSON()
39 {
40 return _json;
41 }
42
43
44
45
46
47 public void setJSON(JSON json)
48 {
49 _json=json;
50 }
51
52
53
54
55
56 public JSON getMsgJSON()
57 {
58 return _msgJSON;
59 }
60
61
62
63
64
65 public void setMsgJSON(JSON msgJSON)
66 {
67 _msgJSON=msgJSON;
68 }
69
70
71
72
73
74 public JSON getBatchJSON()
75 {
76 return _batchJSON;
77 }
78
79
80
81
82
83 public void setBatchJSON(JSON batchJSON)
84 {
85 _batchJSON=batchJSON;
86 }
87
88
89
90 public MessageImpl newMessage()
91 {
92 MessageImpl message=_messagePool.poll();
93 if (message==null)
94 message=new MessageImpl(this);
95 message.incRef();
96 return message;
97 }
98
99
100 public MessageImpl newMessage(Message associated)
101 {
102 MessageImpl message=_messagePool.poll();
103 if (message==null)
104 message=new MessageImpl(this);
105 message.incRef();
106 if (associated!=null)
107 message.setAssociated(associated);
108 return message;
109 }
110
111
112 public void recycleMessage(MessageImpl message)
113 {
114 message.clear();
115 _messagePool.offer(message);
116 }
117
118
119 public Message[] parse(Reader reader) throws IOException
120 {
121 JSON.ReaderSource source =_readerPool.poll();
122 if (source==null)
123 source=new JSON.ReaderSource(reader);
124 else
125 source.setReader(reader);
126
127 Object batch=_batchJSON.parse(source);
128 _readerPool.offer(source);
129
130 if (batch==null)
131 return new Message[0];
132 if (batch.getClass().isArray())
133 return (Message[])batch;
134 return new Message[]{(Message)batch};
135 }
136
137
138 public Message[] parse(String s) throws IOException
139 {
140 Object batch=_batchJSON.parse(new JSON.StringSource(s));
141 if (batch==null)
142 return new Message[0];
143 if (batch.getClass().isArray())
144 return (Message[])batch;
145 return new Message[]{(Message)batch};
146 }
147
148
149 public void parseTo(String fodder, List<Message> messages)
150 {
151 Object batch=_batchJSON.parse(new JSON.StringSource(fodder));
152 if (batch==null)
153 return;
154 if (batch.getClass().isArray())
155 {
156 Message[] msgs=(Message[])batch;
157 for (int m=0;m<msgs.length;m++)
158 messages.add(msgs[m]);
159 }
160 else
161 messages.add((Message)batch);
162 }
163
164
165
166 private StringMap _fieldStrings = new StringMap();
167 private StringMap _valueStrings = new StringMap();
168 {
169 _fieldStrings.put(Bayeux.ADVICE_FIELD,Bayeux.ADVICE_FIELD);
170 _fieldStrings.put(Bayeux.CHANNEL_FIELD,Bayeux.CHANNEL_FIELD);
171 _fieldStrings.put(Bayeux.CLIENT_FIELD,Bayeux.CLIENT_FIELD);
172 _fieldStrings.put("connectionType","connectionType");
173 _fieldStrings.put(Bayeux.DATA_FIELD,Bayeux.DATA_FIELD);
174 _fieldStrings.put(Bayeux.ERROR_FIELD,Bayeux.ERROR_FIELD);
175 _fieldStrings.put(Bayeux.EXT_FIELD,Bayeux.EXT_FIELD);
176 _fieldStrings.put(Bayeux.ID_FIELD,Bayeux.ID_FIELD);
177 _fieldStrings.put(Bayeux.SUBSCRIPTION_FIELD,Bayeux.SUBSCRIPTION_FIELD);
178 _fieldStrings.put(Bayeux.SUCCESSFUL_FIELD,Bayeux.SUCCESSFUL_FIELD);
179 _fieldStrings.put(Bayeux.TIMESTAMP_FIELD,Bayeux.TIMESTAMP_FIELD);
180 _fieldStrings.put(Bayeux.TRANSPORT_FIELD,Bayeux.TRANSPORT_FIELD);
181
182 _valueStrings.put(Bayeux.META_CLIENT,Bayeux.META_CLIENT);
183 _valueStrings.put(Bayeux.META_CONNECT,Bayeux.META_CONNECT);
184 _valueStrings.put(Bayeux.META_DISCONNECT,Bayeux.META_DISCONNECT);
185 _valueStrings.put(Bayeux.META_HANDSHAKE,Bayeux.META_HANDSHAKE);
186 _valueStrings.put(Bayeux.META_SUBSCRIBE,Bayeux.META_SUBSCRIBE);
187 _valueStrings.put(Bayeux.META_UNSUBSCRIBE,Bayeux.META_UNSUBSCRIBE);
188 }
189
190
191
192
193 private JSON _json = new JSON()
194 {
195 @Override
196 protected String toString(char[] buffer, int offset, int length)
197 {
198 Map.Entry entry = _valueStrings.getEntry(buffer,offset,length);
199 if (entry!=null)
200 return (String)entry.getValue();
201 String s= new String(buffer,offset,length);
202 return s;
203 }
204 };
205
206
207
208 private JSON _msgJSON = new JSON()
209 {
210 @Override
211 protected Map newMap()
212 {
213 return newMessage();
214 }
215
216 @Override
217 protected String toString(char[] buffer, int offset, int length)
218 {
219 Map.Entry entry = _fieldStrings.getEntry(buffer,offset,length);
220 if (entry!=null)
221 return (String)entry.getValue();
222 String s= new String(buffer,offset,length);
223 return s;
224 }
225
226 @Override
227 protected JSON contextFor(String field)
228 {
229 return _json;
230 }
231 };
232
233
234
235 private JSON _batchJSON = new JSON()
236 {
237 @Override
238 protected Map newMap()
239 {
240 return newMessage();
241 }
242
243 @Override
244 protected Object[] newArray(int size)
245 {
246 return new Message[size];
247 }
248
249 @Override
250 protected JSON contextFor(String field)
251 {
252 return _json;
253 }
254
255 @Override
256 protected JSON contextForArray()
257 {
258 return _msgJSON;
259 }
260 };
261
262 }