View Javadoc

1   package org.mortbay.cometd;
2   
3   import java.io.IOException;
4   import java.io.Reader;
5   import java.util.HashMap;
6   import java.util.List;
7   import java.util.Map;
8   
9   import org.cometd.Bayeux;
10  import org.cometd.Message;
11  import org.mortbay.util.ArrayQueue;
12  import org.mortbay.util.StringMap;
13  import org.mortbay.util.ajax.JSON;
14  
15  public class MessagePool
16  {
17      final private ArrayQueue<MessageImpl> _messagePool;
18      final private ArrayQueue<JSON.ReaderSource> _readerPool;
19  
20      /* ------------------------------------------------------------ */
21      public MessagePool()
22      {
23          this(50);
24      }
25  
26      /* ------------------------------------------------------------ */
27      public MessagePool(int capacity)
28      {
29          _messagePool=new ArrayQueue<MessageImpl>(capacity,capacity);
30          _readerPool=new ArrayQueue<JSON.ReaderSource>(capacity,capacity);
31      }
32  
33      /* ------------------------------------------------------------ */
34      /**
35       * @return the {@link JSON} instance used to convert data and ext fields
36       */
37      public JSON getJSON()
38      {
39          return _json;
40      }
41  
42      /* ------------------------------------------------------------ */
43      /**
44       * @param json
45       *            the {@link JSON} instance used to convert data and ext fields
46       */
47      public void setJSON(JSON json)
48      {
49          _json=json;
50      }
51  
52      /* ------------------------------------------------------------ */
53      /**
54       * @return the {@link JSON} instance used to serialize and deserialize
55       *         bayeux bayeux messages
56       */
57      public JSON getMsgJSON()
58      {
59          return _msgJSON;
60      }
61  
62      /* ------------------------------------------------------------ */
63      /**
64       * @param msgJSON
65       *            the {@link JSON} instance used to serialize and deserialize
66       *            bayeux messages
67       */
68      public void setMsgJSON(JSON msgJSON)
69      {
70          _msgJSON=msgJSON;
71      }
72  
73      /* ------------------------------------------------------------ */
74      /**
75       * @return the {@link JSON} instance used to deserialize batches of bayeux
76       *         messages
77       */
78      public JSON getBatchJSON()
79      {
80          return _batchJSON;
81      }
82  
83      /* ------------------------------------------------------------ */
84      /**
85       * @param batchJSON
86       *            the {@link JSON} instance used to convert batches of bayeux
87       *            messages
88       */
89      public void setBatchJSON(JSON batchJSON)
90      {
91          _batchJSON=batchJSON;
92      }
93  
94      /* ------------------------------------------------------------ */
95      public MessageImpl newMessage()
96      {
97          MessageImpl message=_messagePool.poll();
98          if (message == null)
99          {
100             message=new MessageImpl(this);
101         }
102         message.incRef();
103         return message;
104     }
105 
106     /* ------------------------------------------------------------ */
107     public MessageImpl newMessage(Message associated)
108     {
109         MessageImpl message=_messagePool.poll();
110         if (message == null)
111         {
112             message=new MessageImpl(this);
113         }
114         message.incRef();
115         if (associated != null)
116             message.setAssociated(associated);
117         return message;
118     }
119 
120     /* ------------------------------------------------------------ */
121     void recycleMessage(MessageImpl message)
122     {
123         message.clear();
124         _messagePool.offer(message);
125     }
126 
127     /* ------------------------------------------------------------ */
128     public Message[] parse(Reader reader) throws IOException
129     {
130         JSON.ReaderSource source=_readerPool.poll();
131         if (source == null)
132             source=new JSON.ReaderSource(reader);
133         else
134             source.setReader(reader);
135 
136         Object batch=_batchJSON.parse(source);
137         _readerPool.offer(source);
138 
139         if (batch == null)
140             return new Message[0];
141         if (batch.getClass().isArray())
142             return (Message[])batch;
143         return new Message[]
144         {(Message)batch};
145     }
146 
147     /* ------------------------------------------------------------ */
148     public Message[] parse(String s) throws IOException
149     {
150         Object batch=_batchJSON.parse(new JSON.StringSource(s));
151         if (batch == null)
152             return new Message[0];
153         if (batch.getClass().isArray())
154             return (Message[])batch;
155         return new Message[]
156         {(Message)batch};
157     }
158 
159     /* ------------------------------------------------------------ */
160     public void parseTo(String fodder, List<Message> messages)
161     {
162         Object batch=_batchJSON.parse(new JSON.StringSource(fodder));
163         if (batch == null)
164             return;
165         if (batch.getClass().isArray())
166         {
167             Message[] msgs=(Message[])batch;
168             for (int m=0; m < msgs.length; m++)
169                 messages.add(msgs[m]);
170         }
171         else
172             messages.add((Message)batch);
173     }
174 
175     /* ------------------------------------------------------------ */
176     public String toString()
177     {
178         return "MessagePool:" + _messagePool.size() + "/" + _messagePool.getCapacity();
179 
180     }
181 
182     /* ------------------------------------------------------------ */
183     /* ------------------------------------------------------------ */
184     private StringMap _fieldStrings=new StringMap();
185     private StringMap _valueStrings=new StringMap();
186     {
187         _fieldStrings.put(Bayeux.ADVICE_FIELD,Bayeux.ADVICE_FIELD);
188         _fieldStrings.put(Bayeux.CHANNEL_FIELD,Bayeux.CHANNEL_FIELD);
189         _fieldStrings.put(Bayeux.CLIENT_FIELD,Bayeux.CLIENT_FIELD);
190         _fieldStrings.put(Bayeux.DATA_FIELD,Bayeux.DATA_FIELD);
191         _fieldStrings.put(Bayeux.ERROR_FIELD,Bayeux.ERROR_FIELD);
192         _fieldStrings.put(Bayeux.EXT_FIELD,Bayeux.EXT_FIELD);
193         _fieldStrings.put(Bayeux.ID_FIELD,Bayeux.ID_FIELD);
194         _fieldStrings.put(Bayeux.SUBSCRIPTION_FIELD,Bayeux.SUBSCRIPTION_FIELD);
195         _fieldStrings.put(Bayeux.SUCCESSFUL_FIELD,Bayeux.SUCCESSFUL_FIELD);
196         _fieldStrings.put(Bayeux.TIMESTAMP_FIELD,Bayeux.TIMESTAMP_FIELD);
197         _fieldStrings.put(Bayeux.TRANSPORT_FIELD,Bayeux.TRANSPORT_FIELD);
198         _fieldStrings.put("connectionType","connectionType");
199 
200         _valueStrings.put(Bayeux.META_CLIENT,Bayeux.META_CLIENT);
201         _valueStrings.put(Bayeux.META_CONNECT,Bayeux.META_CONNECT);
202         _valueStrings.put(Bayeux.META_DISCONNECT,Bayeux.META_DISCONNECT);
203         _valueStrings.put(Bayeux.META_HANDSHAKE,Bayeux.META_HANDSHAKE);
204         _valueStrings.put(Bayeux.META_SUBSCRIBE,Bayeux.META_SUBSCRIBE);
205         _valueStrings.put(Bayeux.META_UNSUBSCRIBE,Bayeux.META_UNSUBSCRIBE);
206         _valueStrings.put("long-polling","long-polling");
207     }
208 
209     /* ------------------------------------------------------------ */
210     /* ------------------------------------------------------------ */
211     private JSON _json=new JSON()
212     {
213         @Override
214         protected Map newMap()
215         {
216             return new HashMap(8);
217         }
218 
219         @Override
220         protected String toString(char[] buffer, int offset, int length)
221         {
222             Map.Entry entry=_valueStrings.getEntry(buffer,offset,length);
223             if (entry != null)
224                 return (String)entry.getValue();
225             String s=new String(buffer,offset,length);
226             return s;
227         }
228     };
229 
230     /* ------------------------------------------------------------ */
231     /* ------------------------------------------------------------ */
232     private JSON _msgJSON=new JSON()
233     {
234         @Override
235         protected Map newMap()
236         {
237             return newMessage();
238         }
239 
240         @Override
241         protected String toString(char[] buffer, int offset, int length)
242         {
243             Map.Entry entry=_fieldStrings.getEntry(buffer,offset,length);
244             if (entry != null)
245                 return (String)entry.getValue();
246 
247             String s=new String(buffer,offset,length);
248             return s;
249         }
250 
251         @Override
252         protected JSON contextFor(String field)
253         {
254             return _json;
255         }
256     };
257 
258     /* ------------------------------------------------------------ */
259     /* ------------------------------------------------------------ */
260     private JSON _batchJSON=new JSON()
261     {
262         @Override
263         protected Map newMap()
264         {
265             return newMessage();
266         }
267 
268         @Override
269         protected Object[] newArray(int size)
270         {
271             return new Message[size]; // todo recycle
272         }
273 
274         @Override
275         protected JSON contextFor(String field)
276         {
277             return _json;
278         }
279 
280         @Override
281         protected JSON contextForArray()
282         {
283             return _msgJSON;
284         }
285     };
286 
287 }