1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.jetty.client;
16
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.concurrent.atomic.AtomicBoolean;
21
22 import org.mortbay.io.Buffer;
23 import org.mortbay.io.Buffers;
24 import org.mortbay.io.ByteArrayBuffer;
25 import org.mortbay.io.Connection;
26 import org.mortbay.io.EndPoint;
27 import org.mortbay.io.View;
28 import org.mortbay.io.nio.SelectChannelEndPoint;
29 import org.mortbay.jetty.HttpGenerator;
30 import org.mortbay.jetty.HttpHeaderValues;
31 import org.mortbay.jetty.HttpHeaders;
32 import org.mortbay.jetty.HttpParser;
33 import org.mortbay.jetty.HttpSchemes;
34 import org.mortbay.jetty.HttpVersions;
35 import org.mortbay.jetty.client.security.Authorization;
36 import org.mortbay.jetty.security.SslHttpChannelEndPoint;
37 import org.mortbay.log.Log;
38 import org.mortbay.thread.Timeout;
39
40
41
42
43
44
45 public class HttpConnection implements Connection
46 {
47 HttpDestination _destination;
48 EndPoint _endp;
49 HttpGenerator _generator;
50 HttpParser _parser;
51 boolean _http11 = true;
52 Buffer _connectionHeader;
53 Buffer _requestContentChunk;
54 boolean _requestComplete;
55 public boolean _reserved;
56
57 volatile HttpExchange _exchange;
58 HttpExchange _pipeline;
59 private final Timeout.Task _timeout = new TimeoutTask();
60 private AtomicBoolean _idle = new AtomicBoolean(false);
61
62 public void dump() throws IOException
63 {
64 System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65 System.err.println("generator=" + _generator);
66 System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67 System.err.println("exchange=" + _exchange);
68 if (_endp instanceof SslHttpChannelEndPoint)
69 ((SslHttpChannelEndPoint)_endp).dump();
70 }
71
72
73 HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
74 {
75 _endp = endp;
76 _generator = new HttpGenerator(buffers,endp,hbs,cbs);
77 _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
78 }
79
80 public void setReserved (boolean reserved)
81 {
82 _reserved = reserved;
83 }
84
85 public boolean isReserved()
86 {
87 return _reserved;
88 }
89
90
91 public HttpDestination getDestination()
92 {
93 return _destination;
94 }
95
96
97 public void setDestination(HttpDestination destination)
98 {
99 _destination = destination;
100 }
101
102
103 public boolean send(HttpExchange ex) throws IOException
104 {
105
106
107
108 synchronized (this)
109 {
110 if (_exchange != null)
111 {
112 if (_pipeline != null)
113 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
114 _pipeline = ex;
115 return true;
116 }
117
118 if (!_endp.isOpen())
119 return false;
120
121 _exchange = ex;
122 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
123
124 if (_endp.isBlocking())
125 {
126 this.notify();
127 }
128 else
129 {
130 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
131 scep.scheduleWrite();
132 }
133
134 long exchTimeout = _exchange.getTimeout();
135
136 if (exchTimeout > 0)
137 {
138 _destination.getHttpClient().schedule(_timeout, exchTimeout);
139 }
140 else
141 {
142 _destination.getHttpClient().schedule(_timeout);
143 }
144
145 return true;
146 }
147 }
148
149
150 public void handle() throws IOException
151 {
152 int no_progress = 0;
153
154 boolean failed = false;
155 while (_endp.isBufferingInput() || _endp.isOpen())
156 {
157 synchronized (this)
158 {
159 while (_exchange == null)
160 {
161 if (_endp.isBlocking())
162 {
163 try
164 {
165 this.wait();
166 }
167 catch (InterruptedException e)
168 {
169 throw new InterruptedIOException();
170 }
171 }
172 else
173 {
174
175 _parser.fill();
176 _parser.skipCRLF();
177 if (_parser.isMoreInBuffer())
178 {
179 Log.warn("Unexpected data received but no request sent");
180 close();
181 }
182 return;
183 }
184 }
185 }
186 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
187 {
188 no_progress = 0;
189 commitRequest();
190 }
191
192 try
193 {
194 long io = 0;
195 _endp.flush();
196
197 if (_generator.isComplete())
198 {
199 if (!_requestComplete)
200 {
201 _requestComplete = true;
202 _exchange.getEventListener().onRequestComplete();
203 }
204 }
205 else
206 {
207
208 synchronized (this)
209 {
210 if (_exchange == null)
211 continue;
212 long flushed = _generator.flush();
213 io += flushed;
214 }
215
216 if (!_generator.isComplete())
217 {
218 InputStream in = _exchange.getRequestContentSource();
219 if (in != null)
220 {
221 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
222 {
223 _requestContentChunk = _exchange.getRequestContentChunk();
224 if (_requestContentChunk != null)
225 _generator.addContent(_requestContentChunk,false);
226 else
227 _generator.complete();
228 io += _generator.flush();
229 }
230 }
231 else
232 _generator.complete();
233 }
234 }
235
236 if (_generator.isComplete() && !_requestComplete)
237 {
238 _requestComplete = true;
239 _exchange.getEventListener().onRequestComplete();
240 }
241
242
243 if (!_parser.isComplete() && _generator.isCommitted())
244 {
245 long filled = _parser.parseAvailable();
246 io += filled;
247 }
248
249 if (io > 0)
250 no_progress = 0;
251 else if (no_progress++ >= 2 && !_endp.isBlocking())
252 {
253
254 if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
255 {
256 if (_generator.flush()>0)
257 continue;
258 }
259 return;
260 }
261 }
262 catch (Throwable e)
263 {
264 Log.debug("Failure on " + _exchange, e);
265
266 if (e instanceof ThreadDeath)
267 throw (ThreadDeath)e;
268
269 synchronized (this)
270 {
271 if (_exchange != null)
272 {
273 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
274 _exchange.getEventListener().onException(e);
275 }
276 }
277
278 failed = true;
279 if (e instanceof IOException)
280 throw (IOException)e;
281
282 if (e instanceof Error)
283 throw (Error)e;
284
285 if (e instanceof RuntimeException)
286 throw (RuntimeException)e;
287
288 throw new RuntimeException(e);
289 }
290 finally
291 {
292 boolean complete = false;
293 boolean close = failed;
294 if (!failed)
295 {
296
297 if (_generator.isComplete())
298 {
299 if (!_requestComplete)
300 {
301 _requestComplete = true;
302 _exchange.getEventListener().onRequestComplete();
303 }
304
305
306
307 if (_parser.isComplete())
308 {
309 _destination.getHttpClient().cancel(_timeout);
310 complete = true;
311 }
312 }
313 }
314
315 if (complete || failed)
316 {
317 synchronized (this)
318 {
319 if (!close)
320 close = shouldClose();
321
322 reset(true);
323
324 no_progress = 0;
325 if (_exchange != null)
326 {
327 _exchange = null;
328
329 if (_pipeline == null)
330 {
331 if (!isReserved())
332 _destination.returnConnection(this,close);
333 }
334 else
335 {
336 if (close)
337 {
338 if (!isReserved())
339 _destination.returnConnection(this,close);
340
341 HttpExchange exchange = _pipeline;
342 _pipeline = null;
343 _destination.send(exchange);
344 }
345 else
346 {
347 HttpExchange exchange = _pipeline;
348 _pipeline = null;
349 send(exchange);
350 }
351 }
352 }
353 }
354 }
355 }
356 }
357 }
358
359
360 public boolean isIdle()
361 {
362 synchronized (this)
363 {
364 return _exchange == null;
365 }
366 }
367
368
369 public EndPoint getEndPoint()
370 {
371 return _endp;
372 }
373
374
375 private void commitRequest() throws IOException
376 {
377 synchronized (this)
378 {
379 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
380 throw new IllegalStateException();
381
382 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
383 _generator.setVersion(_exchange._version);
384
385 String uri = _exchange._uri;
386 if (_destination.isProxied() && uri.startsWith("/"))
387 {
388
389 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
390 + _destination.getAddress().getPort() + uri;
391 Authorization auth = _destination.getProxyAuthentication();
392 if (auth != null)
393 auth.setCredentials(_exchange);
394 }
395
396 _generator.setRequest(_exchange._method,uri);
397
398 if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
399 {
400 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
401 _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
402 }
403
404 if (_exchange._requestContent != null)
405 {
406 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
407 _generator.completeHeader(_exchange._requestFields,false);
408 _generator.addContent(new View(_exchange._requestContent),true);
409 }
410 else if (_exchange._requestContentSource != null)
411 {
412 _generator.completeHeader(_exchange._requestFields,false);
413 int available = _exchange._requestContentSource.available();
414 if (available > 0)
415 {
416
417
418
419 byte[] buf = new byte[available];
420 int length = _exchange._requestContentSource.read(buf);
421 _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
422 }
423 }
424 else
425 {
426 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH);
427 _generator.completeHeader(_exchange._requestFields,true);
428 }
429
430 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
431 }
432 }
433
434
435 protected void reset(boolean returnBuffers) throws IOException
436 {
437 _requestComplete = false;
438 _connectionHeader = null;
439 _parser.reset(returnBuffers);
440 _generator.reset(returnBuffers);
441 _http11 = true;
442 }
443
444
445 private boolean shouldClose()
446 {
447 if (_connectionHeader!=null)
448 {
449 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
450 return true;
451 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
452 return false;
453 }
454 return !_http11;
455 }
456
457
458 private class Handler extends HttpParser.EventHandler
459 {
460 @Override
461 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
462 {
463
464
465
466
467
468 }
469
470 @Override
471 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
472 {
473 HttpExchange exchange = _exchange;
474 if (exchange!=null)
475 {
476 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
477 exchange.getEventListener().onResponseStatus(version,status,reason);
478 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
479 }
480 }
481
482 @Override
483 public void parsedHeader(Buffer name, Buffer value) throws IOException
484 {
485 HttpExchange exchange = _exchange;
486 if (exchange!=null)
487 {
488 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
489 {
490 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
491 }
492 exchange.getEventListener().onResponseHeader(name,value);
493 }
494 }
495
496 @Override
497 public void headerComplete() throws IOException
498 {
499 HttpExchange exchange = _exchange;
500 if (exchange!=null)
501 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
502 }
503
504 @Override
505 public void content(Buffer ref) throws IOException
506 {
507 HttpExchange exchange = _exchange;
508 if (exchange!=null)
509 exchange.getEventListener().onResponseContent(ref);
510 }
511
512 @Override
513 public void messageComplete(long contextLength) throws IOException
514 {
515 HttpExchange exchange = _exchange;
516 if (exchange!=null)
517 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
518 }
519 }
520
521
522 public String toString()
523 {
524 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
525 }
526
527
528 public String toDetailString()
529 {
530 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
531 }
532
533
534 public void close() throws IOException
535 {
536 try
537 {
538 _endp.close();
539 }
540 finally
541 {
542 HttpExchange exchange=_exchange;
543 if (exchange!=null)
544 {
545 int status = exchange.getStatus();
546 if (status>HttpExchange.STATUS_START && status<HttpExchange.STATUS_COMPLETED)
547 exchange.onException(new IOException("CLOSED"));
548 }
549 }
550 }
551
552
553
554 public void setIdleTimeout()
555 {
556 synchronized (this)
557 {
558 if (_idle.compareAndSet(false,true))
559 _destination.getHttpClient().scheduleIdle(_timeout);
560 else
561 throw new IllegalStateException();
562 }
563 }
564
565
566 public boolean cancelIdleTimeout()
567 {
568 synchronized (this)
569 {
570 if (_idle.compareAndSet(true,false))
571 {
572 _destination.getHttpClient().cancel(_timeout);
573 return true;
574 }
575 }
576
577 return false;
578 }
579
580
581
582
583 private class TimeoutTask extends Timeout.Task
584 {
585 public void expired()
586 {
587 HttpExchange ex=null;
588 try
589 {
590 synchronized (HttpConnection.this)
591 {
592 ex = _exchange;
593 _exchange = null;
594 if (ex != null)
595 {
596 _destination.returnConnection(HttpConnection.this,true);
597 }
598 else if (_idle.compareAndSet(true,false))
599 {
600 _destination.returnIdleConnection(HttpConnection.this);
601 }
602 }
603 }
604 catch (Exception e)
605 {
606 Log.debug(e);
607 }
608 finally
609 {
610 try
611 {
612 close();
613 }
614 catch (IOException e)
615 {
616 Log.ignore(e);
617 }
618
619 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
620 {
621 ex.setStatus(HttpExchange.STATUS_EXPIRED);
622 }
623 }
624 }
625 }
626
627 }