1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.net.SocketTimeoutException;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.ClosedSelectorException;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.jboss.netty.channel.AbstractChannelSink;
33 import org.jboss.netty.channel.Channel;
34 import org.jboss.netty.channel.ChannelEvent;
35 import org.jboss.netty.channel.ChannelFuture;
36 import org.jboss.netty.channel.ChannelPipeline;
37 import org.jboss.netty.channel.ChannelState;
38 import org.jboss.netty.channel.ChannelStateEvent;
39 import org.jboss.netty.channel.MessageEvent;
40 import org.jboss.netty.logging.InternalLogger;
41 import org.jboss.netty.logging.InternalLoggerFactory;
42 import org.jboss.netty.util.ThreadRenamingRunnable;
43 import org.jboss.netty.util.internal.IoWorkerRunnable;
44
45
46
47
48
49
50
51
52
53 class NioServerSocketPipelineSink extends AbstractChannelSink {
54
55 static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
57 private static final AtomicInteger nextId = new AtomicInteger();
58
59 private final int id = nextId.incrementAndGet();
60 private final NioWorker[] workers;
61 private final AtomicInteger workerIndex = new AtomicInteger();
62
63 NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
64 workers = new NioWorker[workerCount];
65 for (int i = 0; i < workers.length; i ++) {
66 workers[i] = new NioWorker(id, i + 1, workerExecutor);
67 }
68 }
69
70 public void eventSunk(
71 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
72 Channel channel = e.getChannel();
73 if (channel instanceof NioServerSocketChannel) {
74 handleServerSocket(e);
75 } else if (channel instanceof NioSocketChannel) {
76 handleAcceptedSocket(e);
77 }
78 }
79
80 private void handleServerSocket(ChannelEvent e) {
81 if (!(e instanceof ChannelStateEvent)) {
82 return;
83 }
84
85 ChannelStateEvent event = (ChannelStateEvent) e;
86 NioServerSocketChannel channel =
87 (NioServerSocketChannel) event.getChannel();
88 ChannelFuture future = event.getFuture();
89 ChannelState state = event.getState();
90 Object value = event.getValue();
91
92 switch (state) {
93 case OPEN:
94 if (Boolean.FALSE.equals(value)) {
95 close(channel, future);
96 }
97 break;
98 case BOUND:
99 if (value != null) {
100 bind(channel, future, (SocketAddress) value);
101 } else {
102 close(channel, future);
103 }
104 break;
105 }
106 }
107
108 private void handleAcceptedSocket(ChannelEvent e) {
109 if (e instanceof ChannelStateEvent) {
110 ChannelStateEvent event = (ChannelStateEvent) e;
111 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
112 ChannelFuture future = event.getFuture();
113 ChannelState state = event.getState();
114 Object value = event.getValue();
115
116 switch (state) {
117 case OPEN:
118 if (Boolean.FALSE.equals(value)) {
119 channel.worker.close(channel, future);
120 }
121 break;
122 case BOUND:
123 case CONNECTED:
124 if (value == null) {
125 channel.worker.close(channel, future);
126 }
127 break;
128 case INTEREST_OPS:
129 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
130 break;
131 }
132 } else if (e instanceof MessageEvent) {
133 MessageEvent event = (MessageEvent) e;
134 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
135 boolean offered = channel.writeBuffer.offer(event);
136 assert offered;
137 channel.worker.writeFromUserCode(channel);
138 }
139 }
140
141 private void bind(
142 NioServerSocketChannel channel, ChannelFuture future,
143 SocketAddress localAddress) {
144
145 boolean bound = false;
146 boolean bossStarted = false;
147 try {
148 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
149 bound = true;
150
151 future.setSuccess();
152 fireChannelBound(channel, channel.getLocalAddress());
153
154 Executor bossExecutor =
155 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
156 bossExecutor.execute(
157 new IoWorkerRunnable(
158 new ThreadRenamingRunnable(
159 new Boss(channel),
160 "New I/O server boss #" + id +
161 " (" + channel + ')')));
162 bossStarted = true;
163 } catch (Throwable t) {
164 future.setFailure(t);
165 fireExceptionCaught(channel, t);
166 } finally {
167 if (!bossStarted && bound) {
168 close(channel, future);
169 }
170 }
171 }
172
173 private void close(NioServerSocketChannel channel, ChannelFuture future) {
174 boolean bound = channel.isBound();
175 try {
176 if (channel.socket.isOpen()) {
177 channel.socket.close();
178 Selector selector = channel.selector;
179 if (selector != null) {
180 selector.wakeup();
181 }
182 }
183
184
185
186
187 channel.shutdownLock.lock();
188 try {
189 if (channel.setClosed()) {
190 future.setSuccess();
191 if (bound) {
192 fireChannelUnbound(channel);
193 }
194 fireChannelClosed(channel);
195 } else {
196 future.setSuccess();
197 }
198 } finally {
199 channel.shutdownLock.unlock();
200 }
201 } catch (Throwable t) {
202 future.setFailure(t);
203 fireExceptionCaught(channel, t);
204 }
205 }
206
207 NioWorker nextWorker() {
208 return workers[Math.abs(
209 workerIndex.getAndIncrement() % workers.length)];
210 }
211
212 private final class Boss implements Runnable {
213 private final Selector selector;
214 private final NioServerSocketChannel channel;
215
216 Boss(NioServerSocketChannel channel) throws IOException {
217 this.channel = channel;
218
219 selector = Selector.open();
220
221 boolean registered = false;
222 try {
223 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
224 registered = true;
225 } finally {
226 if (!registered) {
227 closeSelector();
228 }
229 }
230
231 channel.selector = selector;
232 }
233
234 public void run() {
235 final Thread currentThread = Thread.currentThread();
236
237 channel.shutdownLock.lock();
238 try {
239 for (;;) {
240 try {
241 if (selector.select(1000) > 0) {
242 selector.selectedKeys().clear();
243 }
244
245 SocketChannel acceptedSocket = channel.socket.accept();
246 if (acceptedSocket != null) {
247 registerAcceptedChannel(acceptedSocket, currentThread);
248 }
249 } catch (SocketTimeoutException e) {
250
251
252 } catch (CancelledKeyException e) {
253
254 } catch (ClosedSelectorException e) {
255
256 } catch (ClosedChannelException e) {
257
258 break;
259 } catch (Throwable e) {
260 logger.warn(
261 "Failed to accept a connection.", e);
262 try {
263 Thread.sleep(1000);
264 } catch (InterruptedException e1) {
265
266 }
267 }
268 }
269 } finally {
270 channel.shutdownLock.unlock();
271 closeSelector();
272 }
273 }
274
275 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
276 try {
277 ChannelPipeline pipeline =
278 channel.getConfig().getPipelineFactory().getPipeline();
279 NioWorker worker = nextWorker();
280 worker.register(new NioAcceptedSocketChannel(
281 channel.getFactory(), pipeline, channel,
282 NioServerSocketPipelineSink.this, acceptedSocket,
283 worker, currentThread), null);
284 } catch (Exception e) {
285 logger.warn(
286 "Failed to initialize an accepted socket.", e);
287 try {
288 acceptedSocket.close();
289 } catch (IOException e2) {
290 logger.warn(
291 "Failed to close a partially accepted socket.",
292 e2);
293 }
294 }
295 }
296
297 private void closeSelector() {
298 channel.selector = null;
299 try {
300 selector.close();
301 } catch (Exception e) {
302 logger.warn("Failed to close a selector.", e);
303 }
304 }
305 }
306 }