1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.jetty.client;
16
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20
21 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
22
23 import org.mortbay.io.Buffer;
24 import org.mortbay.io.Buffers;
25 import org.mortbay.io.ByteArrayBuffer;
26 import org.mortbay.io.Connection;
27 import org.mortbay.io.EndPoint;
28 import org.mortbay.io.nio.SelectChannelEndPoint;
29 import org.mortbay.jetty.HttpGenerator;
30 import org.mortbay.jetty.HttpHeaderValues;
31 import org.mortbay.jetty.HttpHeaders;
32 import org.mortbay.jetty.HttpParser;
33 import org.mortbay.jetty.HttpSchemes;
34 import org.mortbay.jetty.HttpVersions;
35 import org.mortbay.jetty.client.security.Authorization;
36 import org.mortbay.jetty.security.SslHttpChannelEndPoint;
37 import org.mortbay.log.Log;
38 import org.mortbay.thread.Timeout;
39
40
41
42
43
44
45 public class HttpConnection implements Connection
46 {
47 HttpDestination _destination;
48 EndPoint _endp;
49 HttpGenerator _generator;
50 HttpParser _parser;
51 boolean _http11 = true;
52 Buffer _connectionHeader;
53 Buffer _requestContentChunk;
54 long _last;
55 boolean _requestComplete;
56 public String _message;
57 public Throwable _throwable;
58
59
60 HttpExchange _exchange;
61 HttpExchange _pipeline;
62
63 public void dump() throws IOException
64 {
65 System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
66 System.err.println("generator=" + _generator);
67 System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
68 System.err.println("exchange=" + _exchange);
69 if (_endp instanceof SslHttpChannelEndPoint)
70 ((SslHttpChannelEndPoint)_endp).dump();
71 }
72
73 Timeout.Task _timeout = new Timeout.Task()
74 {
75 public void expire()
76 {
77 HttpExchange ex = null;
78 try
79 {
80 synchronized (HttpConnection.this)
81 {
82 ex = _exchange;
83 _exchange = null;
84 if (ex != null)
85 _destination.returnConnection(HttpConnection.this,true);
86 }
87 }
88 catch (Exception e)
89 {
90 Log.debug(e);
91 }
92 finally
93 {
94 try
95 {
96 _endp.close();
97 }
98 catch (IOException e)
99 {
100 Log.ignore(e);
101 }
102
103 if (ex.getStatus() < HttpExchange.STATUS_COMPLETED)
104 {
105 ex.setStatus(HttpExchange.STATUS_EXPIRED);
106 }
107 }
108 }
109
110 };
111
112
113 HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
114 {
115 _endp = endp;
116 _generator = new HttpGenerator(buffers,endp,hbs,cbs);
117 _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
118 }
119
120
121 public HttpDestination getDestination()
122 {
123 return _destination;
124 }
125
126
127 public void setDestination(HttpDestination destination)
128 {
129 _destination = destination;
130 }
131
132
133 public boolean send(HttpExchange ex) throws IOException
134 {
135
136
137
138 _throwable = new Throwable();
139 synchronized (this)
140 {
141 if (_exchange != null)
142 {
143 if (_pipeline != null)
144 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
145 _pipeline = ex;
146 return true;
147 }
148
149 if (!_endp.isOpen())
150 return false;
151
152 ex.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
153 _exchange = ex;
154
155 if (_endp.isBlocking())
156 this.notify();
157 else
158 {
159 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
160 scep.scheduleWrite();
161 }
162
163 if (!_endp.isBlocking())
164 _destination.getHttpClient().schedule(_timeout);
165
166 return true;
167 }
168 }
169
170
171 public void handle() throws IOException
172 {
173 int no_progress = 0;
174 long flushed = 0;
175
176 boolean failed = false;
177 while (_endp.isBufferingInput() || _endp.isOpen())
178 {
179 synchronized (this)
180 {
181 while (_exchange == null)
182 {
183 if (_endp.isBlocking())
184 {
185 try
186 {
187 this.wait();
188 }
189 catch (InterruptedException e)
190 {
191 throw new InterruptedIOException();
192 }
193 }
194 else
195 {
196
197 _parser.fill();
198 _parser.skipCRLF();
199 if (_parser.isMoreInBuffer())
200 {
201 Log.warn("unexpected data");
202 _endp.close();
203 }
204
205 return;
206 }
207 }
208 }
209 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
210 {
211 no_progress = 0;
212 commitRequest();
213 }
214
215 try
216 {
217 long io = 0;
218 _endp.flush();
219
220 if (_generator.isComplete())
221 {
222 if (!_requestComplete)
223 {
224 _requestComplete = true;
225 _exchange.getEventListener().onRequestComplete();
226 }
227 }
228 else
229 {
230
231 synchronized (this)
232 {
233 if (_exchange == null)
234 continue;
235 flushed = _generator.flush();
236 io += flushed;
237 }
238
239 if (!_generator.isComplete())
240 {
241 InputStream in = _exchange.getRequestContentSource();
242 if (in != null)
243 {
244 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
245 {
246 _requestContentChunk = _exchange.getRequestContentChunk();
247 if (_requestContentChunk != null)
248 _generator.addContent(_requestContentChunk,false);
249 else
250 _generator.complete();
251 io += _generator.flush();
252 }
253 }
254 else
255 _generator.complete();
256 }
257 }
258
259
260
261 if (!_parser.isComplete() && _generator.isCommitted())
262 {
263 long filled = _parser.parseAvailable();
264 io += filled;
265 }
266
267 if (io > 0)
268 no_progress = 0;
269 else if (no_progress++ >= 2 && !_endp.isBlocking())
270 {
271
272 if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
273 {
274 if (_generator.flush()>0)
275 continue;
276 }
277 return;
278 }
279 }
280 catch (IOException e)
281 {
282 synchronized (this)
283 {
284 if (_exchange != null)
285 {
286 _exchange.getEventListener().onException(e);
287 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
288 }
289 }
290 failed = true;
291 throw e;
292 }
293 finally
294 {
295 boolean complete = false;
296 boolean close = failed;
297 if (!failed)
298 {
299
300 if (_generator.isComplete())
301 {
302 if (!_requestComplete)
303 {
304 _requestComplete = true;
305 _exchange.getEventListener().onRequestComplete();
306 }
307
308
309
310 if (_parser.isComplete())
311 {
312 _destination.getHttpClient().cancel(_timeout);
313 complete = true;
314 }
315 }
316 }
317
318 if (complete || failed)
319 {
320 synchronized (this)
321 {
322 if (!close)
323 close = shouldClose();
324
325 reset(true);
326 no_progress = 0;
327 flushed = -1;
328 if (_exchange != null)
329 {
330 _exchange = null;
331
332 if (_pipeline == null)
333 {
334 _destination.returnConnection(this,close);
335 if (close)
336 return;
337 }
338 else
339 {
340 if (close)
341 {
342 _destination.returnConnection(this,close);
343 _destination.send(_pipeline);
344 _pipeline = null;
345 return;
346 }
347
348 HttpExchange ex = _pipeline;
349 _pipeline = null;
350
351 send(ex);
352 }
353 }
354 }
355 }
356 }
357 }
358 }
359
360
361 public boolean isIdle()
362 {
363 synchronized (this)
364 {
365 return _exchange == null;
366 }
367 }
368
369
370 public EndPoint getEndPoint()
371 {
372 return _endp;
373 }
374
375
376 private void commitRequest() throws IOException
377 {
378 synchronized (this)
379 {
380 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
381 throw new IllegalStateException();
382
383 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
384 _generator.setVersion(_exchange._version);
385
386 String uri = _exchange._uri;
387 if (_destination.isProxied() && uri.startsWith("/"))
388 {
389
390 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
391 + _destination.getAddress().getPort() + uri;
392 Authorization auth = _destination.getProxyAuthentication();
393 if (auth != null)
394 auth.setCredentials(_exchange);
395 }
396
397 _generator.setRequest(_exchange._method,uri);
398
399 if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
400 {
401 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
402 _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
403 }
404
405 if (_exchange._requestContent != null)
406 {
407 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
408 _generator.completeHeader(_exchange._requestFields,false);
409 _generator.addContent(_exchange._requestContent,true);
410 }
411 else if (_exchange._requestContentSource != null)
412 {
413 _generator.completeHeader(_exchange._requestFields,false);
414 int available = _exchange._requestContentSource.available();
415 if (available > 0)
416 {
417
418
419
420 byte[] buf = new byte[available];
421 int length = _exchange._requestContentSource.read(buf);
422 _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
423 }
424 }
425 else
426 {
427 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH);
428
429
430
431
432
433 _generator.completeHeader(_exchange._requestFields,true);
434 }
435
436 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
437 }
438 }
439
440
441 protected void reset(boolean returnBuffers) throws IOException
442 {
443 _requestComplete = false;
444 _connectionHeader = null;
445 _parser.reset(returnBuffers);
446 _generator.reset(returnBuffers);
447 _http11 = true;
448 }
449
450
451 private boolean shouldClose()
452 {
453 if (_connectionHeader!=null)
454 {
455 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
456 return true;
457 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
458 return false;
459 }
460 return !_http11;
461 }
462
463
464 private class Handler extends HttpParser.EventHandler
465 {
466 @Override
467 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
468 {
469
470
471
472
473
474 }
475
476 @Override
477 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
478 {
479 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
480 _exchange.getEventListener().onResponseStatus(version,status,reason);
481 _exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
482 }
483
484 @Override
485 public void parsedHeader(Buffer name, Buffer value) throws IOException
486 {
487 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
488 {
489 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
490 }
491 _exchange.getEventListener().onResponseHeader(name,value);
492 }
493
494 @Override
495 public void headerComplete() throws IOException
496 {
497 _exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
498 }
499
500 @Override
501 public void content(Buffer ref) throws IOException
502 {
503 _exchange.getEventListener().onResponseContent(ref);
504 }
505
506 @Override
507 public void messageComplete(long contextLength) throws IOException
508 {
509 _exchange.setStatus(HttpExchange.STATUS_COMPLETED);
510 }
511 }
512
513
514 public String toString()
515 {
516 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
517 }
518
519
520 public String toDetailString()
521 {
522 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
523 }
524
525
526
527
528
529 public long getLast()
530 {
531 return _last;
532 }
533
534
535
536
537
538
539 public void setLast(long last)
540 {
541 _last = last;
542 }
543
544
545 public void close() throws IOException
546 {
547 _endp.close();
548 }
549
550 }