View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd.continuation;
16  
17  import javax.servlet.http.HttpServletResponse;
18  
19  import org.mortbay.cometd.ClientImpl;
20  import org.mortbay.log.Log;
21  import org.mortbay.thread.Timeout;
22  import org.mortbay.util.ajax.Continuation;
23  
24  /* ------------------------------------------------------------ */
25  /**
26   * Extension of {@link ClientImpl} that uses {@link Continuation}s to resume
27   * clients waiting for messages. Continuation clients are used for remote
28   * clients and have removed if they are not accessed within an idle timeout (@link
29   * {@link ContinuationBayeux#_clientTimer}).
30   *
31   * @author gregw
32   *
33   */
34  public class ContinuationClient extends ClientImpl
35  {
36      private final ContinuationBayeux _bayeux;
37      private final Timeout.Task _intervalTimeoutTask;
38      private final Timeout.Task _lazyTimeoutTask;
39      private long _accessed;
40      private volatile Continuation _continuation;
41      private volatile boolean _lazyResuming;
42  
43      /* ------------------------------------------------------------ */
44      protected ContinuationClient(ContinuationBayeux bayeux)
45      {
46          super(bayeux);
47          _bayeux=bayeux;
48  
49          if (isLocal())
50          {
51              _intervalTimeoutTask=null;
52              _lazyTimeoutTask=null;
53          }
54          else
55          {
56              // The timeout task for when a long poll does not arrive.
57              _intervalTimeoutTask=new Timeout.Task()
58              {
59                  @Override
60                  public void expired()
61                  {
62                      remove(true);
63                  }
64  
65                  @Override
66                  public String toString()
67                  {
68                      return "T-" + ContinuationClient.this.toString();
69                  }
70              };
71  
72              // The timeout task for lazy messages
73              _lazyTimeoutTask=new Timeout.Task()
74              {
75                  @Override
76                  public void expired()
77                  {
78                      _lazyResuming=false;
79                      if (hasMessages())
80                          resume();
81                  }
82  
83                  @Override
84                  public String toString()
85                  {
86                      return "L-" + ContinuationClient.this.toString();
87                  }
88              };
89  
90              _bayeux.startTimeout(_intervalTimeoutTask,_bayeux.getMaxInterval());
91          }
92      }
93  
94      /* ------------------------------------------------------------ */
95      public void setContinuation(Continuation continuation)
96      {
97          synchronized (this)
98          {
99              Continuation oldContinuation = _continuation;
100             _continuation = continuation;
101             Log.debug("Old continuation {}, new continuation {}", oldContinuation, continuation);
102 
103             // There can be a suspended old continuation if the remote client reloads or
104             // somehow re-issues a new long poll request with an outstanding long poll request.
105             // In this case we resume() the existing continuation, otherwise
106             // it will expire, this method is entered again with a null argument, and a timeout
107             // to expire this client will be scheduled (which is wrong because client expiration
108             // must be handled by the new continuation and not by the old one, which dies here)
109             if (oldContinuation != null && oldContinuation.isPending())
110             {
111                 try
112                 {
113                     int responseCode = HttpServletResponse.SC_REQUEST_TIMEOUT;
114                     Log.debug("Sending {} on old continuation {}", responseCode, oldContinuation);
115                     HttpServletResponse response = (HttpServletResponse)oldContinuation.getObject();
116                     response.sendError(responseCode);
117                 }
118                 catch(Exception x)
119                 {
120                     Log.ignore(x);
121                 }
122 
123                 try
124                 {
125                     Log.debug("Resuming old continuation {}", oldContinuation);
126                     // The response is committed so this resume() will be blocked by ContinuationCometdServlet
127                     oldContinuation.resume();
128                 }
129                 catch (Exception x)
130                 {
131                     Log.ignore(x);
132                 }
133             }
134 
135             if (continuation == null)
136             {
137                 // Set timeout when to expect the next long poll
138                 if (_intervalTimeoutTask != null)
139                     _bayeux.startTimeout(_intervalTimeoutTask, _bayeux.getMaxInterval());
140             }
141             else
142             {
143                 _bayeux.cancelTimeout(_intervalTimeoutTask);
144                 _accessed = _bayeux.getNow();
145             }
146         }
147     }
148 
149     /* ------------------------------------------------------------ */
150     public Continuation getContinuation()
151     {
152         return _continuation;
153     }
154 
155     /* ------------------------------------------------------------ */
156     @Override
157     public void lazyResume()
158     {
159         int max=_bayeux.getMaxLazyLatency();
160         if (max>0 && _lazyTimeoutTask!=null && !_lazyResuming)
161         {
162             _lazyResuming=true;
163             // use modulo so all lazy clients do not wakeup at once
164             _bayeux.startTimeout(_lazyTimeoutTask,_accessed%max);
165         }
166     }
167 
168     /* ------------------------------------------------------------ */
169     @Override
170     public void resume()
171     {
172         synchronized(this)
173         {
174             if (_continuation != null)
175             {
176                 _continuation.resume();
177             }
178             _continuation=null;
179         }
180     }
181 
182     /* ------------------------------------------------------------ */
183     @Override
184     public boolean isLocal()
185     {
186         return false;
187     }
188 
189     /* ------------------------------------------------------------ */
190     public void access()
191     {
192         synchronized(this)
193         {
194             _accessed=_bayeux.getNow();
195             if (_intervalTimeoutTask != null && _intervalTimeoutTask.isScheduled())
196             {
197                 // reschedule the timer even though it may be cancelled next...
198                 // it might not be.
199                 _intervalTimeoutTask.reschedule();
200             }
201         }
202     }
203 
204     /* ------------------------------------------------------------ */
205     public synchronized long lastAccessed()
206     {
207         return _accessed;
208     }
209 
210     /* ------------------------------------------------------------ */
211     /*
212      * (non-Javadoc)
213      *
214      * @see org.mortbay.cometd.ClientImpl#remove(boolean)
215      */
216     @Override
217     public void remove(boolean wasTimeout)
218     {
219         synchronized(this)
220         {
221             if (!wasTimeout && _intervalTimeoutTask != null)
222                 _bayeux.cancelTimeout(_intervalTimeoutTask);
223         }
224         super.remove(wasTimeout);
225     }
226 }