View Javadoc

1   package org.mortbay.cometd.ext;
2   
3   import java.util.Map;
4   import java.util.Queue;
5   
6   import org.cometd.Bayeux;
7   import org.cometd.Client;
8   import org.cometd.Extension;
9   import org.cometd.Message;
10  import org.mortbay.cometd.MessageImpl;
11  
12  /**
13   * Acknowledged Message Client extension.
14   *
15   * Tracks the batch id of messages sent to a client.
16   *
17   */
18  public class AcknowledgedMessagesClientExtension implements Extension
19  {
20      private final Client _client;
21      private final ArrayIdQueue<Message> _unackedQueue;
22      private long _lastAck;
23  
24      public AcknowledgedMessagesClientExtension(Client client)
25      {
26          _client=client;
27          _unackedQueue=new ArrayIdQueue<Message>(8,16,client);
28          _unackedQueue.setCurrentId(1);
29      }
30  
31      public Message rcv(Client from, Message message)
32      {
33          return message;
34      }
35  
36      /**
37       * Handle received meta messages. Looks for meta/connect messages with
38       * ext/ack fields. If present, delete all messages that have been acked and
39       * requeue messages that have not been acked.
40       */
41      public Message rcvMeta(Client from, Message message)
42      {
43          if (message.getChannel().equals(Bayeux.META_CONNECT))
44          {
45              synchronized(_client)
46              {
47                  Map<String,Object> ext=message.getExt(false);
48                  if (ext != null)
49                  {
50                      Long acked=(Long)ext.get("ack");
51                      if (acked != null)
52                      {
53                          final int s=_unackedQueue.size();
54  
55                          if (acked<=_lastAck)
56                          {
57                              Queue<Message> clientQueue = from.getQueue();
58                              clientQueue.clear();
59                              for (int i=0; i < s; ++i)
60                              {
61                                  Message m = _unackedQueue.getUnsafe(i);
62                                  if (m instanceof MessageImpl)
63                                      ((MessageImpl)m).incRef();
64                                  clientQueue.add(m);
65                              }
66                          }
67                          _lastAck=acked;
68  
69                          // We have received an ack ID, so delete the acked messages.
70                          if (s > 0)
71                          {
72                              if (_unackedQueue.getAssociatedIdUnsafe(s - 1) <= acked)
73                              {
74                                  // we can just clear the queue
75                                  for (int i=0; i < s; i++)
76                                  {
77                                      final Message q=_unackedQueue.getUnsafe(i);
78                                      if (q instanceof MessageImpl)
79                                          ((MessageImpl)q).decRef();
80                                  }
81                                  _unackedQueue.clear();
82                              }
83                              else
84                              {
85                                  // we need to remove elements until we see
86                                  // unacked
87                                  for (int i=0; i < s; i++)
88                                  {
89                                      if (_unackedQueue.getAssociatedIdUnsafe(0) <= acked)
90                                      {
91                                          final Message q=_unackedQueue.remove();
92                                          if (q instanceof MessageImpl)
93                                              ((MessageImpl)q).decRef();
94                                          continue;
95                                      }
96                                      break;
97                                  }
98                              }
99                          }
100                     }
101                 }
102             }
103         }
104 
105         return message;
106     }
107 
108     public Message send(Client from, Message message)
109     {
110         synchronized(_client)
111         {
112             _unackedQueue.add(message);
113             // prevent the message from being erased
114             ((MessageImpl)message).incRef();
115         }
116         return message;
117     }
118 
119     public Message sendMeta(Client from, Message message)
120     {
121         if (message == null)
122             return message;
123 
124         if (message.getChannel() == null)
125             return message;
126 
127         if (message.getChannel().equals(Bayeux.META_CONNECT))
128         {
129             synchronized(_client)
130             {
131                 Map<String,Object> ext=message.getExt(true);
132                 ext.put("ack",_unackedQueue.getCurrentId());
133                 _unackedQueue.incrementCurrentId();
134             }
135         }
136         return message;
137     }
138 }