1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.jetty.nio;
16
17 import java.io.IOException;
18 import java.net.InetSocketAddress;
19 import java.net.Socket;
20 import java.nio.channels.ByteChannel;
21 import java.nio.channels.ServerSocketChannel;
22 import java.nio.channels.SocketChannel;
23
24 import org.mortbay.io.EndPoint;
25 import org.mortbay.io.nio.ChannelEndPoint;
26 import org.mortbay.jetty.EofException;
27 import org.mortbay.jetty.HttpConnection;
28 import org.mortbay.jetty.HttpException;
29 import org.mortbay.jetty.Request;
30 import org.mortbay.log.Log;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class BlockingChannelConnector extends AbstractNIOConnector
46 {
47 private transient ServerSocketChannel _acceptChannel;
48
49
50
51
52
53 public BlockingChannelConnector()
54 {
55 }
56
57
58 public Object getConnection()
59 {
60 return _acceptChannel;
61 }
62
63
64 public void open() throws IOException
65 {
66
67 _acceptChannel= ServerSocketChannel.open();
68 _acceptChannel.configureBlocking(true);
69
70
71 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
72 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
73 }
74
75
76 public void close() throws IOException
77 {
78 if (_acceptChannel != null)
79 _acceptChannel.close();
80 _acceptChannel=null;
81 }
82
83
84 public void accept(int acceptorID)
85 throws IOException, InterruptedException
86 {
87 SocketChannel channel = _acceptChannel.accept();
88 channel.configureBlocking(true);
89 Socket socket=channel.socket();
90 configure(socket);
91
92 Connection connection=new Connection(channel);
93 connection.dispatch();
94 }
95
96
97 public void customize(EndPoint endpoint, Request request)
98 throws IOException
99 {
100 Connection connection = (Connection)endpoint;
101 if (connection._sotimeout!=_maxIdleTime)
102 {
103 connection._sotimeout=_maxIdleTime;
104 ((SocketChannel)endpoint.getTransport()).socket().setSoTimeout(_maxIdleTime);
105 }
106
107 super.customize(endpoint, request);
108 configure(((SocketChannel)endpoint.getTransport()).socket());
109 }
110
111
112
113 public int getLocalPort()
114 {
115 if (_acceptChannel==null || !_acceptChannel.isOpen())
116 return -1;
117 return _acceptChannel.socket().getLocalPort();
118 }
119
120
121
122
123 private class Connection extends ChannelEndPoint implements Runnable
124 {
125 boolean _dispatched=false;
126 HttpConnection _connection;
127 int _sotimeout;
128
129 Connection(ByteChannel channel)
130 {
131 super(channel);
132 _connection = new HttpConnection(BlockingChannelConnector.this,this,getServer());
133 }
134
135 void dispatch() throws IOException
136 {
137 if (!getThreadPool().dispatch(this))
138 {
139 Log.warn("dispatch failed for {}",_connection);
140 close();
141 }
142 }
143
144 public void run()
145 {
146 try
147 {
148 connectionOpened(_connection);
149
150 while (isOpen())
151 {
152 if (_connection.isIdle())
153 {
154 if (getServer().getThreadPool().isLowOnThreads())
155 {
156 int lrmit = getLowResourceMaxIdleTime();
157 if (lrmit>=0 && _sotimeout!= lrmit)
158 {
159 _sotimeout=lrmit;
160 ((SocketChannel)getTransport()).socket().setSoTimeout(_sotimeout);
161 }
162 }
163 }
164 _connection.handle();
165 }
166 }
167 catch (EofException e)
168 {
169 Log.debug("EOF", e);
170 try{close();}
171 catch(IOException e2){Log.ignore(e2);}
172 }
173 catch (HttpException e)
174 {
175 Log.debug("BAD", e);
176 try{close();}
177 catch(IOException e2){Log.ignore(e2);}
178 }
179 catch(Throwable e)
180 {
181 Log.warn("handle failed",e);
182 try{close();}
183 catch(IOException e2){Log.ignore(e2);}
184 }
185 finally
186 {
187 connectionClosed(_connection);
188 }
189 }
190 }
191 }