1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.ConnectException;
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.util.Iterator;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.jboss.netty.channel.ChannelEvent;
36 import org.jboss.netty.channel.ChannelException;
37 import org.jboss.netty.channel.ChannelFuture;
38 import org.jboss.netty.channel.ChannelFutureListener;
39 import org.jboss.netty.channel.ChannelPipeline;
40 import org.jboss.netty.channel.ChannelState;
41 import org.jboss.netty.channel.ChannelStateEvent;
42 import org.jboss.netty.channel.MessageEvent;
43 import org.jboss.netty.logging.InternalLogger;
44 import org.jboss.netty.logging.InternalLoggerFactory;
45 import org.jboss.netty.util.ThreadRenamingRunnable;
46 import org.jboss.netty.util.internal.DeadLockProofWorker;
47
48 class NioClientSocketPipelineSink extends AbstractNioChannelSink {
49
50 private static final AtomicInteger nextId = new AtomicInteger();
51
52 static final InternalLogger logger =
53 InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
54
55 final Executor bossExecutor;
56
57 final int id = nextId.incrementAndGet();
58 private final Boss[] bosses;
59
60 private final AtomicInteger bossIndex = new AtomicInteger();
61
62 private final WorkerPool<NioWorker> workerPool;
63
64 NioClientSocketPipelineSink(
65 Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
66
67 this.bossExecutor = bossExecutor;
68
69 bosses = new Boss[bossCount];
70 for (int i = 0; i < bosses.length; i ++) {
71 bosses[i] = new Boss(i);
72 }
73
74 this.workerPool = workerPool;
75 }
76
77 public void eventSunk(
78 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
79 if (e instanceof ChannelStateEvent) {
80 ChannelStateEvent event = (ChannelStateEvent) e;
81 NioClientSocketChannel channel =
82 (NioClientSocketChannel) event.getChannel();
83 ChannelFuture future = event.getFuture();
84 ChannelState state = event.getState();
85 Object value = event.getValue();
86
87 switch (state) {
88 case OPEN:
89 if (Boolean.FALSE.equals(value)) {
90 channel.worker.close(channel, future);
91 }
92 break;
93 case BOUND:
94 if (value != null) {
95 bind(channel, future, (SocketAddress) value);
96 } else {
97 channel.worker.close(channel, future);
98 }
99 break;
100 case CONNECTED:
101 if (value != null) {
102 connect(channel, future, (SocketAddress) value);
103 } else {
104 channel.worker.close(channel, future);
105 }
106 break;
107 case INTEREST_OPS:
108 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
109 break;
110 }
111 } else if (e instanceof MessageEvent) {
112 MessageEvent event = (MessageEvent) e;
113 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
114 boolean offered = channel.writeBufferQueue.offer(event);
115 assert offered;
116 channel.worker.writeFromUserCode(channel);
117 }
118 }
119
120 private static void bind(
121 NioClientSocketChannel channel, ChannelFuture future,
122 SocketAddress localAddress) {
123 try {
124 channel.channel.socket().bind(localAddress);
125 channel.boundManually = true;
126 channel.setBound();
127 future.setSuccess();
128 fireChannelBound(channel, channel.getLocalAddress());
129 } catch (Throwable t) {
130 future.setFailure(t);
131 fireExceptionCaught(channel, t);
132 }
133 }
134
135 private void connect(
136 final NioClientSocketChannel channel, final ChannelFuture cf,
137 SocketAddress remoteAddress) {
138 try {
139 if (channel.channel.connect(remoteAddress)) {
140 channel.worker.register(channel, cf);
141 } else {
142 channel.getCloseFuture().addListener(new ChannelFutureListener() {
143 public void operationComplete(ChannelFuture f)
144 throws Exception {
145 if (!cf.isDone()) {
146 cf.setFailure(new ClosedChannelException());
147 }
148 }
149 });
150 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
151 channel.connectFuture = cf;
152 nextBoss().register(channel);
153 }
154
155 } catch (Throwable t) {
156 cf.setFailure(t);
157 fireExceptionCaught(channel, t);
158 channel.worker.close(channel, succeededFuture(channel));
159 }
160 }
161
162 Boss nextBoss() {
163 return bosses[Math.abs(
164 bossIndex.getAndIncrement() % bosses.length)];
165 }
166
167 NioWorker nextWorker() {
168 return workerPool.nextWorker();
169 }
170
171 private final class Boss implements Runnable {
172
173 volatile Selector selector;
174 private boolean started;
175 private final AtomicBoolean wakenUp = new AtomicBoolean();
176 private final Object startStopLock = new Object();
177 private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
178 private final int subId;
179
180 Boss(int subId) {
181 this.subId = subId;
182 }
183
184 void register(NioClientSocketChannel channel) {
185 Runnable registerTask = new RegisterTask(this, channel);
186 Selector selector;
187
188 synchronized (startStopLock) {
189 if (!started) {
190
191 try {
192 this.selector = selector = Selector.open();
193 } catch (Throwable t) {
194 throw new ChannelException(
195 "Failed to create a selector.", t);
196 }
197
198
199 boolean success = false;
200 try {
201 DeadLockProofWorker.start(bossExecutor,
202 new ThreadRenamingRunnable(this,
203 "New I/O client boss #" + id + '-' + subId));
204
205 success = true;
206 } finally {
207 if (!success) {
208
209 try {
210 selector.close();
211 } catch (Throwable t) {
212 if (logger.isWarnEnabled()) {
213 logger.warn("Failed to close a selector.", t);
214 }
215 }
216 this.selector = selector = null;
217
218 }
219 }
220 } else {
221
222 selector = this.selector;
223 }
224
225 assert selector != null && selector.isOpen();
226
227 started = true;
228 boolean offered = registerTaskQueue.offer(registerTask);
229 assert offered;
230 }
231
232 if (wakenUp.compareAndSet(false, true)) {
233 selector.wakeup();
234 }
235 }
236
237 public void run() {
238 boolean shutdown = false;
239 Selector selector = this.selector;
240 long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
241 for (;;) {
242 wakenUp.set(false);
243
244 try {
245 int selectedKeyCount = selector.select(10);
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275 if (wakenUp.get()) {
276 selector.wakeup();
277 }
278
279 processRegisterTaskQueue();
280
281 if (selectedKeyCount > 0) {
282 processSelectedKeys(selector.selectedKeys());
283 }
284
285
286 long currentTimeNanos = System.nanoTime();
287 if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) {
288 lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
289 processConnectTimeout(selector.keys(), currentTimeNanos);
290 }
291
292
293
294
295
296
297 if (selector.keys().isEmpty()) {
298 if (shutdown ||
299 bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
300
301 synchronized (startStopLock) {
302 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
303 started = false;
304 try {
305 selector.close();
306 } catch (IOException e) {
307 if (logger.isWarnEnabled()) {
308 logger.warn(
309 "Failed to close a selector.", e);
310 }
311
312 } finally {
313 this.selector = null;
314 }
315 break;
316 } else {
317 shutdown = false;
318 }
319 }
320 } else {
321
322 shutdown = true;
323 }
324 } else {
325 shutdown = false;
326 }
327 } catch (Throwable t) {
328 if (logger.isWarnEnabled()) {
329 logger.warn(
330 "Unexpected exception in the selector loop.", t);
331 }
332
333
334
335 try {
336 Thread.sleep(1000);
337 } catch (InterruptedException e) {
338
339 }
340 }
341 }
342 }
343
344 private void processRegisterTaskQueue() {
345 for (;;) {
346 final Runnable task = registerTaskQueue.poll();
347 if (task == null) {
348 break;
349 }
350
351 task.run();
352 }
353 }
354
355 private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
356 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
357 SelectionKey k = i.next();
358 i.remove();
359
360 if (!k.isValid()) {
361 close(k);
362 continue;
363 }
364
365 if (k.isConnectable()) {
366 connect(k);
367 }
368 }
369 }
370
371 private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
372 ConnectException cause = null;
373 for (SelectionKey k: keys) {
374 if (!k.isValid()) {
375
376
377
378
379
380
381
382
383 continue;
384 }
385
386 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
387 if (ch.connectDeadlineNanos > 0 &&
388 currentTimeNanos >= ch.connectDeadlineNanos) {
389
390 if (cause == null) {
391 cause = new ConnectException("connection timed out");
392 }
393
394 ch.connectFuture.setFailure(cause);
395 fireExceptionCaught(ch, cause);
396 ch.worker.close(ch, succeededFuture(ch));
397 }
398 }
399 }
400
401 private void connect(SelectionKey k) {
402 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
403 try {
404 if (ch.channel.finishConnect()) {
405 k.cancel();
406 ch.worker.register(ch, ch.connectFuture);
407 }
408 } catch (Throwable t) {
409 ch.connectFuture.setFailure(t);
410 fireExceptionCaught(ch, t);
411 k.cancel();
412 ch.worker.close(ch, succeededFuture(ch));
413 }
414 }
415
416 private void close(SelectionKey k) {
417 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
418 ch.worker.close(ch, succeededFuture(ch));
419 }
420 }
421
422 private static final class RegisterTask implements Runnable {
423 private final Boss boss;
424 private final NioClientSocketChannel channel;
425
426 RegisterTask(Boss boss, NioClientSocketChannel channel) {
427 this.boss = boss;
428 this.channel = channel;
429 }
430
431 public void run() {
432 try {
433 channel.channel.register(
434 boss.selector, SelectionKey.OP_CONNECT, channel);
435 } catch (ClosedChannelException e) {
436 channel.worker.close(channel, succeededFuture(channel));
437 }
438
439 int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
440 if (connectTimeout > 0) {
441 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
442 }
443 }
444 }
445 }