1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.mortbay.jetty;
17
18 import java.io.IOException;
19 import java.net.InetAddress;
20 import java.net.Socket;
21 import java.net.UnknownHostException;
22
23 import javax.servlet.ServletRequest;
24
25 import org.mortbay.component.LifeCycle;
26 import org.mortbay.io.EndPoint;
27 import org.mortbay.log.Log;
28 import org.mortbay.thread.ThreadPool;
29 import org.mortbay.util.ajax.Continuation;
30 import org.mortbay.util.ajax.WaitingContinuation;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 public abstract class AbstractConnector extends AbstractBuffers implements Connector
48 {
49 private String _name;
50
51 private Server _server;
52 private ThreadPool _threadPool;
53 private String _host;
54 private int _port=0;
55 private String _integralScheme=HttpSchemes.HTTPS;
56 private int _integralPort=0;
57 private String _confidentialScheme=HttpSchemes.HTTPS;
58 private int _confidentialPort=0;
59 private int _acceptQueueSize=0;
60 private int _acceptors=1;
61 private int _acceptorPriorityOffset=0;
62 private boolean _useDNS;
63 private boolean _forwarded;
64 private String _hostHeader;
65 private String _forwardedHostHeader = "X-Forwarded-Host";
66 private String _forwardedServerHeader = "X-Forwarded-Server";
67 private String _forwardedForHeader = "X-Forwarded-For";
68 private boolean _reuseAddress=true;
69
70 protected int _maxIdleTime=200000;
71 protected int _lowResourceMaxIdleTime=-1;
72 protected int _soLingerTime=-1;
73
74 private transient Thread[] _acceptorThread;
75
76 Object _statsLock = new Object();
77 transient long _statsStartedAt=-1;
78 transient int _requests;
79 transient int _connections;
80
81 transient int _connectionsOpen;
82 transient int _connectionsOpenMin;
83 transient int _connectionsOpenMax;
84
85 transient long _connectionsDurationMin;
86 transient long _connectionsDurationMax;
87 transient long _connectionsDurationTotal;
88
89 transient int _connectionsRequestsMin;
90 transient int _connectionsRequestsMax;
91
92
93
94
95
96 public AbstractConnector()
97 {
98 }
99
100
101
102
103 public Server getServer()
104 {
105 return _server;
106 }
107
108
109 public void setServer(Server server)
110 {
111 _server=server;
112 }
113
114
115
116
117
118 public ThreadPool getThreadPool()
119 {
120 return _threadPool;
121 }
122
123
124 public void setThreadPool(ThreadPool pool)
125 {
126 _threadPool=pool;
127 }
128
129
130
131
132 public void setHost(String host)
133 {
134 _host=host;
135 }
136
137
138
139
140 public String getHost()
141 {
142 return _host;
143 }
144
145
146
147
148
149 public void setPort(int port)
150 {
151 _port=port;
152 }
153
154
155
156
157
158 public int getPort()
159 {
160 return _port;
161 }
162
163
164
165
166
167
168 public int getMaxIdleTime()
169 {
170 return _maxIdleTime;
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199 public void setMaxIdleTime(int maxIdleTime)
200 {
201 _maxIdleTime = maxIdleTime;
202 }
203
204
205
206
207
208 public int getLowResourceMaxIdleTime()
209 {
210 return _lowResourceMaxIdleTime;
211 }
212
213
214
215
216
217 public void setLowResourceMaxIdleTime(int maxIdleTime)
218 {
219 _lowResourceMaxIdleTime = maxIdleTime;
220 }
221
222
223
224
225
226 public int getSoLingerTime()
227 {
228 return _soLingerTime;
229 }
230
231
232
233
234
235 public int getAcceptQueueSize()
236 {
237 return _acceptQueueSize;
238 }
239
240
241
242
243
244 public void setAcceptQueueSize(int acceptQueueSize)
245 {
246 _acceptQueueSize = acceptQueueSize;
247 }
248
249
250
251
252
253 public int getAcceptors()
254 {
255 return _acceptors;
256 }
257
258
259
260
261
262 public void setAcceptors(int acceptors)
263 {
264 _acceptors = acceptors;
265 }
266
267
268
269
270
271 public void setSoLingerTime(int soLingerTime)
272 {
273 _soLingerTime = soLingerTime;
274 }
275
276
277 protected void doStart() throws Exception
278 {
279 if (_server==null)
280 throw new IllegalStateException("No server");
281
282
283 open();
284
285 super.doStart();
286
287 if (_threadPool==null)
288 _threadPool=_server.getThreadPool();
289 if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle))
290 ((LifeCycle)_threadPool).start();
291
292
293 synchronized(this)
294 {
295 _acceptorThread=new Thread[getAcceptors()];
296
297 for (int i=0;i<_acceptorThread.length;i++)
298 {
299 if (!_threadPool.dispatch(new Acceptor(i)))
300 {
301 Log.warn("insufficient maxThreads configured for {}",this);
302 break;
303 }
304 }
305 }
306
307 Log.info("Started {}",this);
308 }
309
310
311 protected void doStop() throws Exception
312 {
313 Log.info("Stopped {}",this);
314 try{close();} catch(IOException e) {Log.warn(e);}
315
316 if (_threadPool==_server.getThreadPool())
317 _threadPool=null;
318 else if (_threadPool instanceof LifeCycle)
319 ((LifeCycle)_threadPool).stop();
320
321 super.doStop();
322
323 Thread[] acceptors=null;
324 synchronized(this)
325 {
326 acceptors=_acceptorThread;
327 _acceptorThread=null;
328 }
329 if (acceptors != null)
330 {
331 for (int i=0;i<acceptors.length;i++)
332 {
333 Thread thread=acceptors[i];
334 if (thread!=null)
335 thread.interrupt();
336 }
337 }
338
339 }
340
341
342 public void join() throws InterruptedException
343 {
344 Thread[] threads=_acceptorThread;
345 if (threads!=null)
346 for (int i=0;i<threads.length;i++)
347 if (threads[i]!=null)
348 threads[i].join();
349 }
350
351
352 protected void configure(Socket socket)
353 throws IOException
354 {
355 try
356 {
357 socket.setTcpNoDelay(true);
358 if (_maxIdleTime >= 0)
359 socket.setSoTimeout(_maxIdleTime);
360 if (_soLingerTime >= 0)
361 socket.setSoLinger(true, _soLingerTime/1000);
362 else
363 socket.setSoLinger(false, 0);
364 }
365 catch (Exception e)
366 {
367 Log.ignore(e);
368 }
369 }
370
371
372
373 public void customize(EndPoint endpoint, Request request)
374 throws IOException
375 {
376 if (isForwarded())
377 checkForwardedHeaders(endpoint, request);
378 }
379
380
381 protected void checkForwardedHeaders(EndPoint endpoint, Request request)
382 throws IOException
383 {
384 HttpFields httpFields = request.getConnection().getRequestFields();
385
386
387 String forwardedHost = getLeftMostValue(httpFields.getStringField(getForwardedHostHeader()));
388 String forwardedServer = getLeftMostValue(httpFields.getStringField(getForwardedServerHeader()));
389 String forwardedFor = getLeftMostValue(httpFields.getStringField(getForwardedForHeader()));
390
391 if (_hostHeader!=null)
392 {
393
394 httpFields.put(HttpHeaders.HOST_BUFFER, _hostHeader);
395 request.setServerName(null);
396 request.setServerPort(-1);
397 request.getServerName();
398 }
399 else if (forwardedHost != null)
400 {
401
402 httpFields.put(HttpHeaders.HOST_BUFFER, forwardedHost);
403 request.setServerName(null);
404 request.setServerPort(-1);
405 request.getServerName();
406 }
407 else if (forwardedServer != null)
408 {
409
410 request.setServerName(forwardedServer);
411 }
412
413 if (forwardedFor != null)
414 {
415 request.setRemoteAddr(forwardedFor);
416 InetAddress inetAddress = null;
417
418 if (_useDNS)
419 {
420 try
421 {
422 inetAddress = InetAddress.getByName(forwardedFor);
423 }
424 catch (UnknownHostException e)
425 {
426 Log.ignore(e);
427 }
428 }
429
430 request.setRemoteHost(inetAddress==null?forwardedFor:inetAddress.getHostName());
431 }
432 }
433
434
435 protected String getLeftMostValue(String headerValue) {
436 if (headerValue == null)
437 return null;
438
439 int commaIndex = headerValue.indexOf(',');
440
441 if (commaIndex == -1)
442 {
443
444 return headerValue;
445 }
446
447
448 return headerValue.substring(0, commaIndex);
449 }
450
451
452 public void persist(EndPoint endpoint)
453 throws IOException
454 {
455 }
456
457
458
459
460
461
462
463 public int getConfidentialPort()
464 {
465 return _confidentialPort;
466 }
467
468
469
470
471
472
473 public String getConfidentialScheme()
474 {
475 return _confidentialScheme;
476 }
477
478
479
480
481
482 public boolean isIntegral(Request request)
483 {
484 return false;
485 }
486
487
488
489
490
491 public int getIntegralPort()
492 {
493 return _integralPort;
494 }
495
496
497
498
499
500 public String getIntegralScheme()
501 {
502 return _integralScheme;
503 }
504
505
506
507
508
509 public boolean isConfidential(Request request)
510 {
511 return false;
512 }
513
514
515
516
517
518 public void setConfidentialPort(int confidentialPort)
519 {
520 _confidentialPort = confidentialPort;
521 }
522
523
524
525
526
527 public void setConfidentialScheme(String confidentialScheme)
528 {
529 _confidentialScheme = confidentialScheme;
530 }
531
532
533
534
535
536 public void setIntegralPort(int integralPort)
537 {
538 _integralPort = integralPort;
539 }
540
541
542
543
544
545 public void setIntegralScheme(String integralScheme)
546 {
547 _integralScheme = integralScheme;
548 }
549
550
551 public Continuation newContinuation()
552 {
553 return new WaitingContinuation();
554 }
555
556
557 protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
558
559
560 public void stopAccept(int acceptorID) throws Exception
561 {
562 }
563
564
565 public boolean getResolveNames()
566 {
567 return _useDNS;
568 }
569
570
571 public void setResolveNames(boolean resolve)
572 {
573 _useDNS=resolve;
574 }
575
576
577
578
579
580
581 public boolean isForwarded()
582 {
583 return _forwarded;
584 }
585
586
587
588
589
590
591 public void setForwarded(boolean check)
592 {
593 if (check)
594 Log.debug(this+" is forwarded");
595 _forwarded=check;
596 }
597
598
599 public String getHostHeader()
600 {
601 return _hostHeader;
602 }
603
604
605
606
607
608
609
610
611 public void setHostHeader(String hostHeader)
612 {
613 _hostHeader=hostHeader;
614 }
615
616
617 public String getForwardedHostHeader()
618 {
619 return _forwardedHostHeader;
620 }
621
622
623
624
625
626 public void setForwardedHostHeader(String forwardedHostHeader)
627 {
628 _forwardedHostHeader=forwardedHostHeader;
629 }
630
631
632 public String getForwardedServerHeader()
633 {
634 return _forwardedServerHeader;
635 }
636
637
638
639
640
641 public void setForwardedServerHeader(String forwardedServerHeader)
642 {
643 _forwardedServerHeader=forwardedServerHeader;
644 }
645
646
647 public String getForwardedForHeader()
648 {
649 return _forwardedForHeader;
650 }
651
652
653
654
655
656 public void setForwardedForHeader(String forwardedRemoteAddressHeader)
657 {
658 _forwardedForHeader=forwardedRemoteAddressHeader;
659 }
660
661
662 public String toString()
663 {
664 String name = this.getClass().getName();
665 int dot = name.lastIndexOf('.');
666 if (dot>0)
667 name=name.substring(dot+1);
668
669 return name+"@"+(getHost()==null?"0.0.0.0":getHost())+":"+(getLocalPort()<=0?getPort():getLocalPort());
670 }
671
672
673
674
675
676 private class Acceptor implements Runnable
677 {
678 int _acceptor=0;
679
680 Acceptor(int id)
681 {
682 _acceptor=id;
683 }
684
685
686 public void run()
687 {
688 Thread current = Thread.currentThread();
689 String name;
690 synchronized(AbstractConnector.this)
691 {
692 if (_acceptorThread==null)
693 return;
694
695 _acceptorThread[_acceptor]=current;
696 name =_acceptorThread[_acceptor].getName();
697 current.setName(name+" - Acceptor"+_acceptor+" "+AbstractConnector.this);
698 }
699 int old_priority=current.getPriority();
700
701 try
702 {
703 current.setPriority(old_priority-_acceptorPriorityOffset);
704 while (isRunning() && getConnection()!=null)
705 {
706 try
707 {
708 accept(_acceptor);
709 }
710 catch(EofException e)
711 {
712 Log.ignore(e);
713 }
714 catch(IOException e)
715 {
716 Log.ignore(e);
717 }
718 catch(ThreadDeath e)
719 {
720 throw e;
721 }
722 catch(Throwable e)
723 {
724 Log.warn(e);
725 }
726 }
727 }
728 finally
729 {
730 current.setPriority(old_priority);
731 current.setName(name);
732
733 synchronized(AbstractConnector.this)
734 {
735 if (_acceptorThread!=null)
736 _acceptorThread[_acceptor]=null;
737 }
738 }
739 }
740 }
741
742
743 public String getName()
744 {
745 if (_name==null)
746 _name= (getHost()==null?"0.0.0.0":getHost())+":"+(getLocalPort()<=0?getPort():getLocalPort());
747 return _name;
748 }
749
750
751 public void setName(String name)
752 {
753 _name = name;
754 }
755
756
757
758
759
760
761
762
763
764 public int getRequests() {return _requests;}
765
766
767
768
769
770 public long getConnectionsDurationMin()
771 {
772 return _connectionsDurationMin;
773 }
774
775
776
777
778
779 public long getConnectionsDurationTotal()
780 {
781 return _connectionsDurationTotal;
782 }
783
784
785
786
787
788 public int getConnectionsOpenMin()
789 {
790 return _connectionsOpenMin;
791 }
792
793
794
795
796
797 public int getConnectionsRequestsMin()
798 {
799 return _connectionsRequestsMin;
800 }
801
802
803
804
805
806
807
808 public int getConnections() {return _connections;}
809
810
811
812
813
814
815 public int getConnectionsOpen() {return _connectionsOpen;}
816
817
818
819
820
821
822 public int getConnectionsOpenMax() {return _connectionsOpenMax;}
823
824
825
826
827
828
829 public long getConnectionsDurationAve() {return _connections==0?0:(_connectionsDurationTotal/_connections);}
830
831
832
833
834
835
836 public long getConnectionsDurationMax() {return _connectionsDurationMax;}
837
838
839
840
841
842
843 public int getConnectionsRequestsAve() {return _connections==0?0:(_requests/_connections);}
844
845
846
847
848
849
850 public int getConnectionsRequestsMax() {return _connectionsRequestsMax;}
851
852
853
854
855
856
857 public void statsReset()
858 {
859 _statsStartedAt=_statsStartedAt==-1?-1:System.currentTimeMillis();
860
861 _connections=0;
862
863 _connectionsOpenMin=_connectionsOpen;
864 _connectionsOpenMax=_connectionsOpen;
865 _connectionsOpen=0;
866
867 _connectionsDurationMin=0;
868 _connectionsDurationMax=0;
869 _connectionsDurationTotal=0;
870
871 _requests=0;
872
873 _connectionsRequestsMin=0;
874 _connectionsRequestsMax=0;
875 }
876
877
878 public void setStatsOn(boolean on)
879 {
880 if (on && _statsStartedAt!=-1)
881 return;
882 Log.debug("Statistics on = "+on+" for "+this);
883 statsReset();
884 _statsStartedAt=on?System.currentTimeMillis():-1;
885 }
886
887
888
889
890
891 public boolean getStatsOn()
892 {
893 return _statsStartedAt!=-1;
894 }
895
896
897
898
899
900 public long getStatsOnMs()
901 {
902 return (_statsStartedAt!=-1)?(System.currentTimeMillis()-_statsStartedAt):0;
903 }
904
905
906 protected void connectionOpened(HttpConnection connection)
907 {
908 if (_statsStartedAt==-1)
909 return;
910 synchronized(_statsLock)
911 {
912 _connectionsOpen++;
913 if (_connectionsOpen > _connectionsOpenMax)
914 _connectionsOpenMax=_connectionsOpen;
915 }
916 }
917
918
919 protected void connectionClosed(HttpConnection connection)
920 {
921 if (_statsStartedAt>=0)
922 {
923 long duration=System.currentTimeMillis()-connection.getTimeStamp();
924 int requests=connection.getRequests();
925 synchronized(_statsLock)
926 {
927 _requests+=requests;
928 _connections++;
929 _connectionsOpen--;
930 _connectionsDurationTotal+=duration;
931 if (_connectionsOpen<0)
932 _connectionsOpen=0;
933 if (_connectionsOpen<_connectionsOpenMin)
934 _connectionsOpenMin=_connectionsOpen;
935 if (_connectionsDurationMin==0 || duration<_connectionsDurationMin)
936 _connectionsDurationMin=duration;
937 if (duration>_connectionsDurationMax)
938 _connectionsDurationMax=duration;
939 if (_connectionsRequestsMin==0 || requests<_connectionsRequestsMin)
940 _connectionsRequestsMin=requests;
941 if (requests>_connectionsRequestsMax)
942 _connectionsRequestsMax=requests;
943 }
944 }
945
946 connection.destroy();
947 }
948
949
950
951
952
953 public int getAcceptorPriorityOffset()
954 {
955 return _acceptorPriorityOffset;
956 }
957
958
959
960
961
962
963
964
965 public void setAcceptorPriorityOffset(int offset)
966 {
967 _acceptorPriorityOffset=offset;
968 }
969
970
971
972
973
974 public boolean getReuseAddress()
975 {
976 return _reuseAddress;
977 }
978
979
980
981
982
983 public void setReuseAddress(boolean reuseAddress)
984 {
985 _reuseAddress=reuseAddress;
986 }
987
988 }