1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd.continuation;
16
17 import javax.servlet.http.HttpServletResponse;
18
19 import org.mortbay.cometd.ClientImpl;
20 import org.mortbay.log.Log;
21 import org.mortbay.thread.Timeout;
22 import org.mortbay.util.ajax.Continuation;
23
24
25
26
27
28
29
30
31
32
33
34 public class ContinuationClient extends ClientImpl
35 {
36 private final ContinuationBayeux _bayeux;
37 private final Timeout.Task _intervalTimeoutTask;
38 private final Timeout.Task _lazyTimeoutTask;
39 private long _accessed;
40 private volatile Continuation _continuation;
41 private volatile boolean _lazyResuming;
42
43
44 protected ContinuationClient(ContinuationBayeux bayeux)
45 {
46 super(bayeux);
47 _bayeux=bayeux;
48
49 if (isLocal())
50 {
51 _intervalTimeoutTask=null;
52 _lazyTimeoutTask=null;
53 }
54 else
55 {
56
57 _intervalTimeoutTask=new Timeout.Task()
58 {
59 @Override
60 public void expired()
61 {
62 remove(true);
63 }
64
65 @Override
66 public String toString()
67 {
68 return "T-" + ContinuationClient.this.toString();
69 }
70 };
71
72
73 _lazyTimeoutTask=new Timeout.Task()
74 {
75 @Override
76 public void expired()
77 {
78 _lazyResuming=false;
79 if (hasMessages())
80 resume();
81 }
82
83 @Override
84 public String toString()
85 {
86 return "L-" + ContinuationClient.this.toString();
87 }
88 };
89
90 _bayeux.startTimeout(_intervalTimeoutTask,_bayeux.getMaxInterval());
91 }
92 }
93
94
95 public void setContinuation(Continuation continuation)
96 {
97 synchronized (this)
98 {
99 Continuation oldContinuation = _continuation;
100 _continuation = continuation;
101 Log.debug("Old continuation {}, new continuation {}", oldContinuation, continuation);
102
103
104
105
106
107
108
109 if (oldContinuation != null && oldContinuation.isPending())
110 {
111 try
112 {
113 int responseCode = HttpServletResponse.SC_REQUEST_TIMEOUT;
114 Log.debug("Sending {} on old continuation {}", responseCode, oldContinuation);
115 HttpServletResponse response = (HttpServletResponse)oldContinuation.getObject();
116 response.sendError(responseCode);
117 }
118 catch(Exception x)
119 {
120 Log.ignore(x);
121 }
122
123 try
124 {
125 Log.debug("Resuming old continuation {}", oldContinuation);
126
127 oldContinuation.resume();
128 }
129 catch (Exception x)
130 {
131 Log.ignore(x);
132 }
133 }
134
135 if (continuation == null)
136 {
137
138 if (_intervalTimeoutTask != null)
139 _bayeux.startTimeout(_intervalTimeoutTask, _bayeux.getMaxInterval());
140 }
141 else
142 {
143 _bayeux.cancelTimeout(_intervalTimeoutTask);
144 _accessed = _bayeux.getNow();
145 }
146 }
147 }
148
149
150 public Continuation getContinuation()
151 {
152 return _continuation;
153 }
154
155
156 @Override
157 public void lazyResume()
158 {
159 int max=_bayeux.getMaxLazyLatency();
160 if (max>0 && _lazyTimeoutTask!=null && !_lazyResuming)
161 {
162 _lazyResuming=true;
163
164 _bayeux.startTimeout(_lazyTimeoutTask,_accessed%max);
165 }
166 }
167
168
169 @Override
170 public void resume()
171 {
172 synchronized(this)
173 {
174 if (_continuation != null)
175 {
176 _continuation.resume();
177 }
178 _continuation=null;
179 }
180 }
181
182
183 @Override
184 public boolean isLocal()
185 {
186 return false;
187 }
188
189
190 public void access()
191 {
192 synchronized(this)
193 {
194 _accessed=_bayeux.getNow();
195 if (_intervalTimeoutTask != null && _intervalTimeoutTask.isScheduled())
196 {
197
198
199 _intervalTimeoutTask.reschedule();
200 }
201 }
202 }
203
204
205 public synchronized long lastAccessed()
206 {
207 return _accessed;
208 }
209
210
211
212
213
214
215
216 @Override
217 public void remove(boolean wasTimeout)
218 {
219 synchronized(this)
220 {
221 if (!wasTimeout && _intervalTimeoutTask != null)
222 _bayeux.cancelTimeout(_intervalTimeoutTask);
223 }
224 super.remove(wasTimeout);
225 }
226 }