View Javadoc

1   // ========================================================================
2   // Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.jetty.client;
16  
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  
22  import org.mortbay.io.Buffer;
23  import org.mortbay.io.Buffers;
24  import org.mortbay.io.ByteArrayBuffer;
25  import org.mortbay.io.Connection;
26  import org.mortbay.io.EndPoint;
27  import org.mortbay.io.View;
28  import org.mortbay.io.nio.SelectChannelEndPoint;
29  import org.mortbay.jetty.HttpGenerator;
30  import org.mortbay.jetty.HttpHeaderValues;
31  import org.mortbay.jetty.HttpHeaders;
32  import org.mortbay.jetty.HttpParser;
33  import org.mortbay.jetty.HttpSchemes;
34  import org.mortbay.jetty.HttpVersions;
35  import org.mortbay.jetty.client.security.Authorization;
36  import org.mortbay.jetty.security.SslHttpChannelEndPoint;
37  import org.mortbay.log.Log;
38  import org.mortbay.thread.Timeout;
39  
40  /**
41   *
42   * @author Greg Wilkins
43   * @author Guillaume Nodet
44   */
45  public class HttpConnection implements Connection
46  {
47      HttpDestination _destination;
48      EndPoint _endp;
49      HttpGenerator _generator;
50      HttpParser _parser;
51      boolean _http11 = true;
52      Buffer _connectionHeader;
53      Buffer _requestContentChunk;
54      boolean _requestComplete;
55      public boolean _reserved;
56      // The current exchange waiting for a response
57      volatile HttpExchange _exchange;
58      HttpExchange _pipeline;
59      private final Timeout.Task _timeout = new TimeoutTask();
60      private AtomicBoolean _idle = new AtomicBoolean(false);
61  
62      public void dump() throws IOException
63      {
64          System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65          System.err.println("generator=" + _generator);
66          System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67          System.err.println("exchange=" + _exchange);
68          if (_endp instanceof SslHttpChannelEndPoint)
69              ((SslHttpChannelEndPoint)_endp).dump();
70      }
71  
72      /* ------------------------------------------------------------ */
73      HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
74      {
75          _endp = endp;
76          _generator = new HttpGenerator(buffers,endp,hbs,cbs);
77          _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
78      }
79  
80      public void setReserved (boolean reserved)
81      {
82          _reserved = reserved;
83      }
84  
85      public boolean isReserved()
86      {
87          return _reserved;
88      }
89  
90      /* ------------------------------------------------------------ */
91      public HttpDestination getDestination()
92      {
93          return _destination;
94      }
95  
96      /* ------------------------------------------------------------ */
97      public void setDestination(HttpDestination destination)
98      {
99          _destination = destination;
100     }
101 
102     /* ------------------------------------------------------------ */
103     public boolean send(HttpExchange ex) throws IOException
104     {
105         // _message =
106         // Thread.currentThread().getName()+": Generator instance="+_generator
107         // .hashCode()+" state= "+_generator.getState()+" _exchange="+_exchange;
108         synchronized (this)
109         {
110             if (_exchange != null)
111             {
112                 if (_pipeline != null)
113                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
114                 _pipeline = ex;
115                 return true;
116             }
117 
118             if (!_endp.isOpen())
119                 return false;
120 
121             _exchange = ex;
122             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
123 
124             if (_endp.isBlocking())
125             {
126                 this.notify();
127             }
128             else
129             {
130                 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
131                 scep.scheduleWrite();
132             }
133             
134             long exchTimeout = _exchange.getTimeout();
135             
136             if (exchTimeout > 0)
137             {
138                 _destination.getHttpClient().schedule(_timeout, exchTimeout);
139             } 
140             else 
141             {
142                 _destination.getHttpClient().schedule(_timeout);
143             }
144 
145             return true;
146         }
147     }
148 
149     /* ------------------------------------------------------------ */
150     public void handle() throws IOException
151     {
152         int no_progress = 0;
153 
154         boolean failed = false;
155         while (_endp.isBufferingInput() || _endp.isOpen())
156         {
157             synchronized (this)
158             {
159                 while (_exchange == null)
160                 {
161                     if (_endp.isBlocking())
162                     {
163                         try
164                         {
165                             this.wait();
166                         }
167                         catch (InterruptedException e)
168                         {
169                             throw new InterruptedIOException();
170                         }
171                     }
172                     else
173                     {
174                         // Hopefully just space?
175                         _parser.fill();
176                         _parser.skipCRLF();
177                         if (_parser.isMoreInBuffer())
178                         {
179                             Log.warn("Unexpected data received but no request sent");
180                             close();
181                         }
182                         return;
183                     }
184                 }
185             }
186             if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
187             {
188                 no_progress = 0;
189                 commitRequest();
190             }
191 
192             try
193             {
194                 long io = 0;
195                 _endp.flush();
196 
197                 if (_generator.isComplete())
198                 {
199                     if (!_requestComplete)
200                     {
201                         _requestComplete = true;
202                         _exchange.getEventListener().onRequestComplete();
203                     }
204                 }
205                 else
206                 {
207                     // Write as much of the request as possible
208                     synchronized (this)
209                     {
210                         if (_exchange == null)
211                             continue;
212                         long flushed = _generator.flush();
213                         io += flushed;
214                     }
215 
216                     if (!_generator.isComplete())
217                     {
218                         InputStream in = _exchange.getRequestContentSource();
219                         if (in != null)
220                         {
221                             if (_requestContentChunk == null || _requestContentChunk.length() == 0)
222                             {
223                                 _requestContentChunk = _exchange.getRequestContentChunk();
224                                 if (_requestContentChunk != null)
225                                     _generator.addContent(_requestContentChunk,false);
226                                 else
227                                     _generator.complete();
228                                 io += _generator.flush();
229                             }
230                         }
231                         else
232                             _generator.complete();
233                     }
234                 }
235 
236                 if (_generator.isComplete() && !_requestComplete)
237                 {
238                     _requestComplete = true;
239                     _exchange.getEventListener().onRequestComplete();
240                 }
241 
242                 // If we are not ended then parse available
243                 if (!_parser.isComplete() && _generator.isCommitted())
244                 {
245                     long filled = _parser.parseAvailable();
246                     io += filled;
247                 }
248 
249                 if (io > 0)
250                     no_progress = 0;
251                 else if (no_progress++ >= 2 && !_endp.isBlocking())
252                 {
253                     // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
254                     if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
255                     {
256                         if (_generator.flush()>0)
257                             continue;
258                     }
259                     return;
260                 }
261             }
262             catch (Throwable e)
263             {
264                 Log.debug("Failure on " + _exchange, e);
265 
266                 if (e instanceof ThreadDeath)
267                     throw (ThreadDeath)e;
268 
269                 synchronized (this)
270                 {
271                     if (_exchange != null)
272                     {
273                         _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
274                         _exchange.getEventListener().onException(e);
275                     }
276                 }
277 
278                 failed = true;
279                 if (e instanceof IOException)
280                     throw (IOException)e;
281 
282                 if (e instanceof Error)
283                     throw (Error)e;
284 
285                 if (e instanceof RuntimeException)
286                     throw (RuntimeException)e;
287 
288                throw new RuntimeException(e);
289             }
290             finally
291             {
292                 boolean complete = false;
293                 boolean close = failed; // always close the connection on error
294                 if (!failed)
295                 {
296                     // are we complete?
297                     if (_generator.isComplete())
298                     {
299                         if (!_requestComplete)
300                         {
301                             _requestComplete = true;
302                             _exchange.getEventListener().onRequestComplete();
303                         }
304 
305                         // we need to return the HttpConnection to a state that
306                         // it can be reused or closed out
307                         if (_parser.isComplete())
308                         {
309                             _destination.getHttpClient().cancel(_timeout);
310                             complete = true;
311                         }
312                     }
313                 }
314 
315                 if (complete || failed)
316                 {
317                     synchronized (this)
318                     {
319                         if (!close)
320                             close = shouldClose();
321 
322                         reset(true);
323 
324                         no_progress = 0;
325                         if (_exchange != null)
326                         {
327                             _exchange = null;
328 
329                             if (_pipeline == null)
330                             {
331                                 if (!isReserved())
332                                     _destination.returnConnection(this,close);
333                             }
334                             else
335                             {
336                                 if (close)
337                                 {
338                                     if (!isReserved())
339                                         _destination.returnConnection(this,close);
340 
341                                     HttpExchange exchange = _pipeline;
342                                     _pipeline = null;
343                                     _destination.send(exchange);
344                                 }
345                                 else
346                                 {
347                                     HttpExchange exchange = _pipeline;
348                                     _pipeline = null;
349                                     send(exchange);
350                                 }
351                             }
352                         }
353                     }
354                 }
355             }
356         }
357     }
358 
359     /* ------------------------------------------------------------ */
360     public boolean isIdle()
361     {
362         synchronized (this)
363         {
364             return _exchange == null;
365         }
366     }
367 
368     /* ------------------------------------------------------------ */
369     public EndPoint getEndPoint()
370     {
371         return _endp;
372     }
373 
374     /* ------------------------------------------------------------ */
375     private void commitRequest() throws IOException
376     {
377         synchronized (this)
378         {
379             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
380                 throw new IllegalStateException();
381 
382             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
383             _generator.setVersion(_exchange._version);
384 
385             String uri = _exchange._uri;
386             if (_destination.isProxied() && uri.startsWith("/"))
387             {
388                 // TODO suppress port 80 or 443
389                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
390                         + _destination.getAddress().getPort() + uri;
391                 Authorization auth = _destination.getProxyAuthentication();
392                 if (auth != null)
393                     auth.setCredentials(_exchange);
394             }
395 
396             _generator.setRequest(_exchange._method,uri);
397 
398             if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
399             {
400                 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
401                     _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
402             }
403 
404             if (_exchange._requestContent != null)
405             {
406                 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
407                 _generator.completeHeader(_exchange._requestFields,false);
408                 _generator.addContent(new View(_exchange._requestContent),true);
409             }
410             else if (_exchange._requestContentSource != null)
411             {
412                 _generator.completeHeader(_exchange._requestFields,false);
413                 int available = _exchange._requestContentSource.available();
414                 if (available > 0)
415                 {
416                     // TODO deal with any known content length
417 
418                     // TODO reuse this buffer!
419                     byte[] buf = new byte[available];
420                     int length = _exchange._requestContentSource.read(buf);
421                     _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
422                 }
423             }
424             else
425             {
426                 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO
427                 _generator.completeHeader(_exchange._requestFields,true);
428             }
429 
430             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
431         }
432     }
433 
434     /* ------------------------------------------------------------ */
435     protected void reset(boolean returnBuffers) throws IOException
436     {
437         _requestComplete = false;
438         _connectionHeader = null;
439         _parser.reset(returnBuffers);
440         _generator.reset(returnBuffers);
441         _http11 = true;
442     }
443 
444     /* ------------------------------------------------------------ */
445     private boolean shouldClose()
446     {
447         if (_connectionHeader!=null)
448         {
449             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
450                 return true;
451             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
452                 return false;
453         }
454         return !_http11;
455     }
456 
457     /* ------------------------------------------------------------ */
458     private class Handler extends HttpParser.EventHandler
459     {
460         @Override
461         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
462         {
463             // System.out.println( method.toString() + "///" + url.toString() +
464             // "///" + version.toString() );
465             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
466             // out here
467             // throw new IllegalStateException();
468         }
469 
470         @Override
471         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
472         {
473             HttpExchange exchange = _exchange;
474             if (exchange!=null)
475             {
476                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
477                 exchange.getEventListener().onResponseStatus(version,status,reason);
478                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
479             }
480         }
481 
482         @Override
483         public void parsedHeader(Buffer name, Buffer value) throws IOException
484         {
485             HttpExchange exchange = _exchange;
486             if (exchange!=null)
487             {
488                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
489                 {
490                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
491                 }
492                 exchange.getEventListener().onResponseHeader(name,value);
493             }
494         }
495 
496         @Override
497         public void headerComplete() throws IOException
498         {
499             HttpExchange exchange = _exchange;
500             if (exchange!=null)
501                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
502         }
503 
504         @Override
505         public void content(Buffer ref) throws IOException
506         {
507             HttpExchange exchange = _exchange;
508             if (exchange!=null)
509                 exchange.getEventListener().onResponseContent(ref);
510         }
511 
512         @Override
513         public void messageComplete(long contextLength) throws IOException
514         {
515             HttpExchange exchange = _exchange;
516             if (exchange!=null)
517                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
518         }
519     }
520 
521     /* ------------------------------------------------------------ */
522     public String toString()
523     {
524         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
525     }
526 
527     /* ------------------------------------------------------------ */
528     public String toDetailString()
529     {
530         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
531     }
532 
533     /* ------------------------------------------------------------ */
534     public void close() throws IOException
535     {
536         try
537         {
538             _endp.close();
539         }
540         finally
541         {
542             HttpExchange exchange=_exchange;
543             if (exchange!=null)
544             {
545                 int status = exchange.getStatus();
546                 if (status>HttpExchange.STATUS_START && status<HttpExchange.STATUS_COMPLETED)
547                     exchange.onException(new IOException("CLOSED"));
548             }
549         }
550     }
551 
552 
553     /* ------------------------------------------------------------ */
554     public void setIdleTimeout()
555     {
556         synchronized (this)
557         {
558             if (_idle.compareAndSet(false,true))
559                 _destination.getHttpClient().scheduleIdle(_timeout);
560             else
561                 throw new IllegalStateException();
562         }
563     }
564 
565     /* ------------------------------------------------------------ */
566     public boolean cancelIdleTimeout()
567     {
568         synchronized (this)
569         {
570             if (_idle.compareAndSet(true,false))
571             {
572                 _destination.getHttpClient().cancel(_timeout);
573                 return true;
574             }
575         }
576 
577         return false;
578     }
579 
580     /* ------------------------------------------------------------ */
581     /* ------------------------------------------------------------ */
582     /* ------------------------------------------------------------ */
583     private class TimeoutTask extends Timeout.Task
584     {
585         public void expired()
586         {
587             HttpExchange ex=null;
588             try
589             {
590                 synchronized (HttpConnection.this)
591                 {
592                     ex = _exchange;
593                     _exchange = null;
594                     if (ex != null)
595                     {
596                         _destination.returnConnection(HttpConnection.this,true);
597                     }
598                     else if (_idle.compareAndSet(true,false))
599                     {
600                         _destination.returnIdleConnection(HttpConnection.this);
601                     }
602                 }
603             }
604             catch (Exception e)
605             {
606                 Log.debug(e);
607             }
608             finally
609             {
610                 try
611                 {
612                     close();
613                 }
614                 catch (IOException e)
615                 {
616                     Log.ignore(e);
617                 }
618 
619                 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
620                 {
621                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
622                 }
623             }
624         }
625     }
626 
627 }