1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.net.SocketTimeoutException;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.ClosedSelectorException;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.jboss.netty.channel.Channel;
33 import org.jboss.netty.channel.ChannelEvent;
34 import org.jboss.netty.channel.ChannelFuture;
35 import org.jboss.netty.channel.ChannelPipeline;
36 import org.jboss.netty.channel.ChannelState;
37 import org.jboss.netty.channel.ChannelStateEvent;
38 import org.jboss.netty.channel.MessageEvent;
39 import org.jboss.netty.logging.InternalLogger;
40 import org.jboss.netty.logging.InternalLoggerFactory;
41 import org.jboss.netty.util.ThreadRenamingRunnable;
42 import org.jboss.netty.util.internal.DeadLockProofWorker;
43
44 class NioServerSocketPipelineSink extends AbstractNioChannelSink {
45
46 private static final AtomicInteger nextId = new AtomicInteger();
47
48 static final InternalLogger logger =
49 InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
50
51 final int id = nextId.incrementAndGet();
52
53 private final WorkerPool<NioWorker> workerPool;
54
55 NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
56 this.workerPool = workerPool;
57 }
58
59
60 public void eventSunk(
61 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
62 Channel channel = e.getChannel();
63 if (channel instanceof NioServerSocketChannel) {
64 handleServerSocket(e);
65 } else if (channel instanceof NioSocketChannel) {
66 handleAcceptedSocket(e);
67 }
68 }
69
70 private void handleServerSocket(ChannelEvent e) {
71 if (!(e instanceof ChannelStateEvent)) {
72 return;
73 }
74
75 ChannelStateEvent event = (ChannelStateEvent) e;
76 NioServerSocketChannel channel =
77 (NioServerSocketChannel) event.getChannel();
78 ChannelFuture future = event.getFuture();
79 ChannelState state = event.getState();
80 Object value = event.getValue();
81
82 switch (state) {
83 case OPEN:
84 if (Boolean.FALSE.equals(value)) {
85 close(channel, future);
86 }
87 break;
88 case BOUND:
89 if (value != null) {
90 bind(channel, future, (SocketAddress) value);
91 } else {
92 close(channel, future);
93 }
94 break;
95 default:
96 break;
97 }
98 }
99
100 private static void handleAcceptedSocket(ChannelEvent e) {
101 if (e instanceof ChannelStateEvent) {
102 ChannelStateEvent event = (ChannelStateEvent) e;
103 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
104 ChannelFuture future = event.getFuture();
105 ChannelState state = event.getState();
106 Object value = event.getValue();
107
108 switch (state) {
109 case OPEN:
110 if (Boolean.FALSE.equals(value)) {
111 channel.worker.close(channel, future);
112 }
113 break;
114 case BOUND:
115 case CONNECTED:
116 if (value == null) {
117 channel.worker.close(channel, future);
118 }
119 break;
120 case INTEREST_OPS:
121 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
122 break;
123 }
124 } else if (e instanceof MessageEvent) {
125 MessageEvent event = (MessageEvent) e;
126 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
127 boolean offered = channel.writeBufferQueue.offer(event);
128 assert offered;
129 channel.worker.writeFromUserCode(channel);
130 }
131 }
132
133 private void bind(
134 NioServerSocketChannel channel, ChannelFuture future,
135 SocketAddress localAddress) {
136
137 boolean bound = false;
138 boolean bossStarted = false;
139 try {
140 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
141 bound = true;
142
143 future.setSuccess();
144 fireChannelBound(channel, channel.getLocalAddress());
145
146 Executor bossExecutor =
147 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
148 DeadLockProofWorker.start(bossExecutor,
149 new ThreadRenamingRunnable(new Boss(channel),
150 "New I/O server boss #" + id + " (" + channel + ')'));
151 bossStarted = true;
152 } catch (Throwable t) {
153 future.setFailure(t);
154 fireExceptionCaught(channel, t);
155 } finally {
156 if (!bossStarted && bound) {
157 close(channel, future);
158 }
159 }
160 }
161
162 private static void close(NioServerSocketChannel channel, ChannelFuture future) {
163 boolean bound = channel.isBound();
164 try {
165 if (channel.socket.isOpen()) {
166 channel.socket.close();
167 Selector selector = channel.selector;
168 if (selector != null) {
169 selector.wakeup();
170 }
171 }
172
173
174
175
176 channel.shutdownLock.lock();
177 try {
178 if (channel.setClosed()) {
179 future.setSuccess();
180 if (bound) {
181 fireChannelUnbound(channel);
182 }
183 fireChannelClosed(channel);
184 } else {
185 future.setSuccess();
186 }
187 } finally {
188 channel.shutdownLock.unlock();
189 }
190 } catch (Throwable t) {
191 future.setFailure(t);
192 fireExceptionCaught(channel, t);
193 }
194 }
195
196 NioWorker nextWorker() {
197 return workerPool.nextWorker();
198 }
199
200 private final class Boss implements Runnable {
201 private final Selector selector;
202 private final NioServerSocketChannel channel;
203
204 Boss(NioServerSocketChannel channel) throws IOException {
205 this.channel = channel;
206
207 selector = Selector.open();
208
209 boolean registered = false;
210 try {
211 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
212 registered = true;
213 } finally {
214 if (!registered) {
215 closeSelector();
216 }
217 }
218
219 channel.selector = selector;
220 }
221
222 public void run() {
223 final Thread currentThread = Thread.currentThread();
224
225 channel.shutdownLock.lock();
226 try {
227 for (;;) {
228 try {
229 if (selector.select(1000) > 0) {
230
231
232 selector.selectedKeys().clear();
233 }
234
235
236 for (;;) {
237 SocketChannel acceptedSocket = channel.socket.accept();
238 if (acceptedSocket == null) {
239 break;
240 }
241 registerAcceptedChannel(acceptedSocket, currentThread);
242
243 }
244
245 } catch (SocketTimeoutException e) {
246
247
248 } catch (CancelledKeyException e) {
249
250 } catch (ClosedSelectorException e) {
251
252 } catch (ClosedChannelException e) {
253
254 break;
255 } catch (Throwable e) {
256 if (logger.isWarnEnabled()) {
257 logger.warn(
258 "Failed to accept a connection.", e);
259 }
260
261 try {
262 Thread.sleep(1000);
263 } catch (InterruptedException e1) {
264
265 }
266 }
267 }
268 } finally {
269 channel.shutdownLock.unlock();
270 closeSelector();
271 }
272 }
273
274 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
275 try {
276 ChannelPipeline pipeline =
277 channel.getConfig().getPipelineFactory().getPipeline();
278 NioWorker worker = nextWorker();
279 worker.register(new NioAcceptedSocketChannel(
280 channel.getFactory(), pipeline, channel,
281 NioServerSocketPipelineSink.this, acceptedSocket,
282 worker, currentThread), null);
283 } catch (Exception e) {
284 if (logger.isWarnEnabled()) {
285 logger.warn(
286 "Failed to initialize an accepted socket.", e);
287 }
288
289 try {
290 acceptedSocket.close();
291 } catch (IOException e2) {
292 if (logger.isWarnEnabled()) {
293 logger.warn(
294 "Failed to close a partially accepted socket.",
295 e2);
296 }
297
298 }
299 }
300 }
301
302 private void closeSelector() {
303 channel.selector = null;
304 try {
305 selector.close();
306 } catch (Exception e) {
307 if (logger.isWarnEnabled()) {
308 logger.warn("Failed to close a selector.", e);
309 }
310 }
311 }
312 }
313 }