1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.jetty.client;
15
16 import java.io.IOException;
17 import java.lang.reflect.Constructor;
18 import java.util.ArrayList;
19 import java.util.LinkedList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import javax.servlet.http.Cookie;
23
24 import org.mortbay.io.Buffer;
25 import org.mortbay.io.ByteArrayBuffer;
26 import org.mortbay.jetty.HttpHeaders;
27 import org.mortbay.jetty.client.security.Authorization;
28 import org.mortbay.jetty.client.security.SecurityListener;
29 import org.mortbay.jetty.servlet.PathMap;
30 import org.mortbay.log.Log;
31
32
33
34
35
36 public class HttpDestination
37 {
38 private ByteArrayBuffer _hostHeader;
39 private final Address _address;
40 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
41 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
42 private final HttpClient _client;
43 private final boolean _ssl;
44 private int _maxConnections;
45 private int _pendingConnections=0;
46 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10,true);
47 private int _newConnection=0;
48 private Address _proxy;
49 private Authorization _proxyAuthentication;
50 private PathMap _authorizations;
51 private List<Cookie> _cookies;
52
53 public void dump() throws IOException
54 {
55 synchronized (this)
56 {
57 System.err.println(this);
58 System.err.println("connections="+_connections.size());
59 System.err.println("idle="+_idle.size());
60 System.err.println("pending="+_pendingConnections);
61 for (HttpConnection c : _connections)
62 {
63 if (!c.isIdle())
64 c.dump();
65 }
66 }
67 }
68
69
70 private LinkedList<HttpExchange> _queue=new LinkedList<HttpExchange>();
71
72
73 HttpDestination(HttpClient pool, Address address, boolean ssl, int maxConnections)
74 {
75 _client=pool;
76 _address=address;
77 _ssl=ssl;
78 _maxConnections=maxConnections;
79 String addressString = address.getHost();
80 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
81 _hostHeader = new ByteArrayBuffer(addressString);
82 }
83
84
85 public Address getAddress()
86 {
87 return _address;
88 }
89
90
91 public Buffer getHostHeader()
92 {
93 return _hostHeader;
94 }
95
96
97 public HttpClient getHttpClient()
98 {
99 return _client;
100 }
101
102
103 public boolean isSecure()
104 {
105 return _ssl;
106 }
107
108
109 public int getConnections()
110 {
111 synchronized (this)
112 {
113 return _connections.size();
114 }
115 }
116
117
118 public int getIdleConnections()
119 {
120 synchronized (this)
121 {
122 return _idle.size();
123 }
124 }
125
126
127 public void addAuthorization(String pathSpec,Authorization authorization)
128 {
129 synchronized (this)
130 {
131 if (_authorizations==null)
132 _authorizations=new PathMap();
133 _authorizations.put(pathSpec,authorization);
134 }
135
136
137 }
138
139
140 public void addCookie(Cookie cookie)
141 {
142 synchronized (this)
143 {
144 if (_cookies==null)
145 _cookies=new ArrayList<Cookie>();
146 _cookies.add(cookie);
147 }
148
149
150 }
151
152
153
154
155
156
157
158
159
160
161 private HttpConnection getConnection(long timeout) throws IOException
162 {
163 HttpConnection connection = null;
164
165 while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
166 {
167 boolean starting = false;
168 synchronized (this)
169 {
170 int totalConnections = _connections.size() + _pendingConnections;
171 if (totalConnections < _maxConnections)
172 {
173 _newConnection++;
174 startNewConnection();
175 starting = true;
176 }
177 }
178
179 if (!starting)
180 {
181 try
182 {
183 Thread.sleep(200);
184 timeout-=200;
185 }
186 catch (InterruptedException e)
187 {
188 Log.ignore(e);
189 }
190 }
191 else
192 {
193 try
194 {
195 Object o = _newQueue.take();
196 if (o instanceof HttpConnection)
197 {
198 connection = (HttpConnection)o;
199 }
200 else
201 throw (IOException)o;
202 }
203 catch (InterruptedException e)
204 {
205 Log.ignore(e);
206 }
207 }
208 }
209 return connection;
210 }
211
212
213 public HttpConnection reserveConnection(long timeout) throws IOException
214 {
215 HttpConnection connection = getConnection(timeout);
216 if (connection != null)
217 connection.setReserved(true);
218 return connection;
219 }
220
221
222 public HttpConnection getIdleConnection() throws IOException
223 {
224 HttpConnection connection = null;
225 while (true)
226 {
227 synchronized (this)
228 {
229 if (connection!=null)
230 {
231 _connections.remove(connection);
232 connection.close();
233 connection=null;
234 }
235 if (_idle.size() > 0)
236 connection = _idle.remove(_idle.size()-1);
237 }
238
239 if (connection==null)
240 return null;
241
242 if (connection.cancelIdleTimeout() )
243 return connection;
244
245 }
246 }
247
248
249 protected void startNewConnection()
250 {
251 try
252 {
253 synchronized (this)
254 {
255 _pendingConnections++;
256 }
257 _client._connector.startConnection(this);
258 }
259 catch(Exception e)
260 {
261 Log.debug(e);
262 onConnectionFailed(e);
263 }
264 }
265
266
267 public void onConnectionFailed(Throwable throwable)
268 {
269 Throwable connect_failure=null;
270
271 synchronized (this)
272 {
273 _pendingConnections--;
274 if (_newConnection>0)
275 {
276 connect_failure=throwable;
277 _newConnection--;
278 }
279 else if (_queue.size()>0)
280 {
281 HttpExchange ex=_queue.removeFirst();
282 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
283 ex.getEventListener().onConnectionFailed(throwable);
284 }
285 }
286
287 if(connect_failure!=null)
288 {
289 try
290 {
291 _newQueue.put(connect_failure);
292 }
293 catch (InterruptedException e)
294 {
295 Log.ignore(e);
296 }
297 }
298 }
299
300
301 public void onException(Throwable throwable)
302 {
303 synchronized (this)
304 {
305 _pendingConnections--;
306 if (_queue.size()>0)
307 {
308 HttpExchange ex=_queue.removeFirst();
309 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
310 ex.getEventListener().onException(throwable);
311 }
312 }
313 }
314
315
316 public void onNewConnection(HttpConnection connection) throws IOException
317 {
318 HttpConnection q_connection=null;
319
320 synchronized (this)
321 {
322 _pendingConnections--;
323 _connections.add(connection);
324
325 if (_newConnection>0)
326 {
327 q_connection=connection;
328 _newConnection--;
329 }
330 else if (_queue.size()==0)
331 {
332 connection.setIdleTimeout();
333 _idle.add(connection);
334 }
335 else
336 {
337 HttpExchange ex=_queue.removeFirst();
338 connection.send(ex);
339 }
340 }
341
342 if (q_connection!=null)
343 {
344 try
345 {
346 _newQueue.put(q_connection);
347 }
348 catch (InterruptedException e)
349 {
350 Log.ignore(e);
351 }
352 }
353 }
354
355
356 public void returnConnection(HttpConnection connection, boolean close) throws IOException
357 {
358 if (connection.isReserved())
359 connection.setReserved(false);
360
361 if (close)
362 {
363 try
364 {
365 connection.close();
366 }
367 catch(IOException e)
368 {
369 Log.ignore(e);
370 }
371 }
372
373 if (!_client.isStarted())
374 return;
375
376 if (!close && connection.getEndPoint().isOpen())
377 {
378 synchronized (this)
379 {
380 if (_queue.size()==0)
381 {
382 connection.setIdleTimeout();
383 _idle.add(connection);
384 }
385 else
386 {
387 HttpExchange ex = _queue.removeFirst();
388 connection.send(ex);
389 }
390 this.notifyAll();
391 }
392 }
393 else
394 {
395 synchronized (this)
396 {
397 _connections.remove(connection);
398 if (!_queue.isEmpty())
399 startNewConnection();
400 }
401 }
402 }
403
404
405 public void returnIdleConnection(HttpConnection connection) throws IOException
406 {
407 try
408 {
409 connection.close();
410 }
411 catch (IOException e)
412 {
413 Log.ignore(e);
414 }
415
416 synchronized (this)
417 {
418 _idle.remove(connection);
419 _connections.remove(connection);
420 if (!_queue.isEmpty() && _client.isStarted())
421 startNewConnection();
422 }
423 }
424
425
426
427 public void send(HttpExchange ex) throws IOException
428 {
429 LinkedList<String> listeners = _client.getRegisteredListeners();
430
431 if (listeners != null)
432 {
433
434 for (int i = listeners.size(); i > 0; --i)
435 {
436 String listenerClass = listeners.get(i - 1);
437
438 try
439 {
440 Class listener = Class.forName(listenerClass);
441 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
442 HttpEventListener elistener = (HttpEventListener) constructor.newInstance(this, ex);
443 ex.setEventListener(elistener);
444 }
445 catch (Exception e)
446 {
447 Log.debug(e);
448 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass );
449 }
450 }
451 }
452
453
454 if ( _client.hasRealms() )
455 {
456 ex.setEventListener( new SecurityListener( this, ex ) );
457 }
458
459 doSend(ex);
460 }
461
462
463 public void resend(HttpExchange ex) throws IOException
464 {
465 ex.getEventListener().onRetry();
466 ex.reset();
467 doSend(ex);
468 }
469
470
471 protected void doSend(HttpExchange ex) throws IOException
472 {
473
474
475 if (_cookies!=null)
476 {
477 StringBuilder buf=null;
478 for (Cookie cookie : _cookies)
479 {
480 if (buf==null)
481 buf=new StringBuilder();
482 else
483 buf.append("; ");
484 buf.append(cookie.getName());
485 buf.append("=");
486 buf.append(cookie.getValue());
487 }
488 if (buf!=null)
489 ex.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
490 }
491
492
493 if (_authorizations!=null)
494 {
495 Authorization auth= (Authorization)_authorizations.match(ex.getURI());
496 if (auth !=null)
497 auth.setCredentials(ex);
498 }
499
500 HttpConnection connection = getIdleConnection();
501 if (connection != null)
502 {
503 boolean sent = connection.send(ex);
504 if (!sent) connection = null;
505 }
506
507 if (connection == null)
508 {
509 synchronized (this)
510 {
511 _queue.add(ex);
512 if (_connections.size() + _pendingConnections < _maxConnections)
513 {
514 startNewConnection();
515 }
516 }
517 }
518 }
519
520
521 public synchronized String toString()
522 {
523 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
524 }
525
526
527 public synchronized String toDetailString()
528 {
529 StringBuilder b = new StringBuilder();
530 b.append(toString());
531 b.append('\n');
532 synchronized(this)
533 {
534 for (HttpConnection connection : _connections)
535 {
536 if (connection._exchange!=null)
537 {
538 b.append(connection.toDetailString());
539 if (_idle.contains(connection))
540 b.append(" IDLE");
541 b.append('\n');
542 }
543 }
544 }
545 b.append("--");
546 b.append('\n');
547
548 return b.toString();
549 }
550
551
552 public void setProxy(Address proxy)
553 {
554 _proxy=proxy;
555 }
556
557
558 public Address getProxy()
559 {
560 return _proxy;
561 }
562
563
564 public Authorization getProxyAuthentication()
565 {
566 return _proxyAuthentication;
567 }
568
569
570 public void setProxyAuthentication(Authorization authentication)
571 {
572 _proxyAuthentication = authentication;
573 }
574
575
576 public boolean isProxied()
577 {
578 return _proxy!=null;
579 }
580
581
582 public void close() throws IOException
583 {
584 synchronized (this)
585 {
586 for (HttpConnection connection : _connections)
587 {
588 connection.close();
589 }
590 }
591 }
592
593 }