1 package org.mortbay.cometd.ext;
2
3 import java.util.Map;
4 import java.util.Queue;
5
6 import org.cometd.Bayeux;
7 import org.cometd.Client;
8 import org.cometd.Extension;
9 import org.cometd.Message;
10 import org.mortbay.cometd.MessageImpl;
11
12
13
14
15
16
17
18 public class AcknowledgedMessagesClientExtension implements Extension
19 {
20 private final Client _client;
21 private final ArrayIdQueue<Message> _unackedQueue;
22 private long _lastAck;
23
24 public AcknowledgedMessagesClientExtension(Client client)
25 {
26 _client=client;
27 _unackedQueue=new ArrayIdQueue<Message>(8,16,client);
28 _unackedQueue.setCurrentId(1);
29 }
30
31 public Message rcv(Client from, Message message)
32 {
33 return message;
34 }
35
36
37
38
39
40
41 public Message rcvMeta(Client from, Message message)
42 {
43 if (message.getChannel().equals(Bayeux.META_CONNECT))
44 {
45 synchronized(_client)
46 {
47 Map<String,Object> ext=message.getExt(false);
48 if (ext != null)
49 {
50 Long acked=(Long)ext.get("ack");
51 if (acked != null)
52 {
53 final int s=_unackedQueue.size();
54
55 if (acked<=_lastAck)
56 {
57 Queue<Message> clientQueue = from.getQueue();
58 clientQueue.clear();
59 for (int i=0; i < s; ++i)
60 {
61 Message m = _unackedQueue.getUnsafe(i);
62 if (m instanceof MessageImpl)
63 ((MessageImpl)m).incRef();
64 clientQueue.add(m);
65 }
66 }
67 _lastAck=acked;
68
69
70 if (s > 0)
71 {
72 if (_unackedQueue.getAssociatedIdUnsafe(s - 1) <= acked)
73 {
74
75 for (int i=0; i < s; i++)
76 {
77 final Message q=_unackedQueue.getUnsafe(i);
78 if (q instanceof MessageImpl)
79 ((MessageImpl)q).decRef();
80 }
81 _unackedQueue.clear();
82 }
83 else
84 {
85
86
87 for (int i=0; i < s; i++)
88 {
89 if (_unackedQueue.getAssociatedIdUnsafe(0) <= acked)
90 {
91 final Message q=_unackedQueue.remove();
92 if (q instanceof MessageImpl)
93 ((MessageImpl)q).decRef();
94 continue;
95 }
96 break;
97 }
98 }
99 }
100 }
101 }
102 }
103 }
104
105 return message;
106 }
107
108 public Message send(Client from, Message message)
109 {
110 synchronized(_client)
111 {
112 _unackedQueue.add(message);
113
114 ((MessageImpl)message).incRef();
115 }
116 return message;
117 }
118
119 public Message sendMeta(Client from, Message message)
120 {
121 if (message == null)
122 return message;
123
124 if (message.getChannel() == null)
125 return message;
126
127 if (message.getChannel().equals(Bayeux.META_CONNECT))
128 {
129 synchronized(_client)
130 {
131 Map<String,Object> ext=message.getExt(true);
132 ext.put("ack",_unackedQueue.getCurrentId());
133 _unackedQueue.incrementCurrentId();
134 }
135 }
136 return message;
137 }
138 }