1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.nio.channels.AsynchronousCloseException;
22 import java.nio.channels.CancelledKeyException;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.NotYetConnectedException;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.WritableByteChannel;
28 import java.util.Iterator;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.locks.ReadWriteLock;
37 import java.util.concurrent.locks.ReentrantReadWriteLock;
38
39 import org.jboss.netty.channel.Channel;
40 import org.jboss.netty.channel.ChannelException;
41 import org.jboss.netty.channel.ChannelFuture;
42 import org.jboss.netty.channel.MessageEvent;
43 import org.jboss.netty.channel.socket.Worker;
44 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
45 import org.jboss.netty.logging.InternalLogger;
46 import org.jboss.netty.logging.InternalLoggerFactory;
47 import org.jboss.netty.util.ThreadRenamingRunnable;
48 import org.jboss.netty.util.internal.DeadLockProofWorker;
49
50 abstract class AbstractNioWorker implements Worker {
51
52
53 private static final AtomicInteger nextId = new AtomicInteger();
54
55 final int id = nextId.incrementAndGet();
56
57
58
59
60 private static final InternalLogger logger = InternalLoggerFactory
61 .getInstance(AbstractNioWorker.class);
62
63 private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
64
65 static final int CLEANUP_INTERVAL = 256;
66
67
68
69
70
71
72 private final Executor executor;
73
74
75
76
77 private boolean started;
78
79
80
81
82
83 protected volatile Thread thread;
84
85
86
87
88 volatile Selector selector;
89
90
91
92
93
94
95
96 protected final AtomicBoolean wakenUp = new AtomicBoolean();
97
98
99
100
101 private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
102
103
104
105
106 private final Object startStopLock = new Object();
107
108
109
110
111 private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
112
113
114
115
116 protected final Queue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
117
118 private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
119
120
121 private volatile int cancelledKeys;
122
123 protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
124
125 private final boolean allowShutdownOnIdle;
126
127 AbstractNioWorker(Executor executor) {
128 this(executor, true);
129 }
130
131
132
133
134
135 @Deprecated
136 public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) {
137 this.executor = executor;
138 this.allowShutdownOnIdle = allowShutdownOnIdle;
139 }
140
141 void register(AbstractNioChannel<?> channel, ChannelFuture future) {
142
143 Runnable registerTask = createRegisterTask(channel, future);
144
145 synchronized (startStopLock) {
146 Selector selector = start();
147
148
149 boolean offered = registerTaskQueue.offer(registerTask);
150 assert offered;
151
152 if (wakenUp.compareAndSet(false, true)) {
153
154 selector = this.selector;
155
156
157
158 if (selector != null) {
159 selector.wakeup();
160 }
161 }
162
163 }
164 }
165
166
167
168
169
170
171
172 private Selector start() {
173 if (!started) {
174
175 try {
176 selector = Selector.open();
177 } catch (Throwable t) {
178 throw new ChannelException("Failed to create a selector.", t);
179 }
180
181
182 boolean success = false;
183 try {
184 DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
185 success = true;
186 } finally {
187 if (!success) {
188
189 try {
190 selector.close();
191 } catch (Throwable t) {
192 logger.warn("Failed to close a selector.", t);
193 }
194 selector = null;
195
196 }
197 }
198 }
199
200 assert selector != null && selector.isOpen();
201
202 started = true;
203 return selector;
204 }
205
206
207 public void run() {
208 thread = Thread.currentThread();
209
210 boolean shutdown = false;
211 Selector selector = this.selector;
212 for (;;) {
213 wakenUp.set(false);
214
215 if (CONSTRAINT_LEVEL != 0) {
216 selectorGuard.writeLock().lock();
217
218
219 selectorGuard.writeLock().unlock();
220 }
221
222 try {
223 SelectorUtil.select(selector);
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253 if (wakenUp.get()) {
254 selector.wakeup();
255 }
256
257 cancelledKeys = 0;
258 processRegisterTaskQueue();
259 processEventQueue();
260 processWriteTaskQueue();
261 processSelectedKeys(selector.selectedKeys());
262
263
264
265
266
267
268 if (selector.keys().isEmpty()) {
269 if (shutdown ||
270 executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
271
272 synchronized (startStopLock) {
273 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
274 started = false;
275 try {
276 selector.close();
277 } catch (IOException e) {
278 logger.warn(
279 "Failed to close a selector.", e);
280 } finally {
281 this.selector = null;
282 }
283 break;
284 } else {
285 shutdown = false;
286 }
287 }
288 } else {
289 if (allowShutdownOnIdle) {
290
291 shutdown = true;
292 }
293 }
294 } else {
295 shutdown = false;
296 }
297 } catch (Throwable t) {
298 logger.warn(
299 "Unexpected exception in the selector loop.", t);
300
301
302
303 try {
304 Thread.sleep(1000);
305 } catch (InterruptedException e) {
306
307 }
308 }
309 }
310 }
311
312 public void executeInIoThread(Runnable task) {
313 executeInIoThread(task, false);
314 }
315
316
317
318
319
320
321
322
323
324
325 public void executeInIoThread(Runnable task, boolean alwaysAsync) {
326 if (!alwaysAsync && Thread.currentThread() == thread) {
327 task.run();
328 } else {
329 synchronized (startStopLock) {
330 start();
331 boolean added = eventQueue.offer(task);
332
333 assert added;
334 if (added) {
335
336 Selector selector = this.selector;
337 if (selector != null) {
338 selector.wakeup();
339 }
340 }
341 }
342
343 }
344
345 }
346
347
348 private void processRegisterTaskQueue() throws IOException {
349 for (;;) {
350 final Runnable task = registerTaskQueue.poll();
351 if (task == null) {
352 break;
353 }
354
355 task.run();
356 cleanUpCancelledKeys();
357 }
358 }
359
360 private void processWriteTaskQueue() throws IOException {
361 for (;;) {
362 final Runnable task = writeTaskQueue.poll();
363 if (task == null) {
364 break;
365 }
366
367 task.run();
368 cleanUpCancelledKeys();
369 }
370 }
371
372 private void processEventQueue() throws IOException {
373 for (;;) {
374 final Runnable task = eventQueue.poll();
375 if (task == null) {
376 break;
377 }
378 task.run();
379 cleanUpCancelledKeys();
380 }
381 }
382
383 private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
384 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
385 SelectionKey k = i.next();
386 i.remove();
387 try {
388 int readyOps = k.readyOps();
389 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
390 if (!read(k)) {
391
392 continue;
393 }
394 }
395 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
396 writeFromSelectorLoop(k);
397 }
398 } catch (CancelledKeyException e) {
399 close(k);
400 }
401
402 if (cleanUpCancelledKeys()) {
403 break;
404 }
405 }
406 }
407
408 private boolean cleanUpCancelledKeys() throws IOException {
409 if (cancelledKeys >= CLEANUP_INTERVAL) {
410 cancelledKeys = 0;
411 selector.selectNow();
412 return true;
413 }
414 return false;
415 }
416
417
418
419 private void close(SelectionKey k) {
420 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
421 close(ch, succeededFuture(ch));
422 }
423
424 void writeFromUserCode(final AbstractNioChannel<?> channel) {
425 if (!channel.isConnected()) {
426 cleanUpWriteBuffer(channel);
427 return;
428 }
429
430 if (scheduleWriteIfNecessary(channel)) {
431 return;
432 }
433
434
435
436 if (channel.writeSuspended) {
437 return;
438 }
439
440 if (channel.inWriteNowLoop) {
441 return;
442 }
443
444 write0(channel);
445 }
446
447 void writeFromTaskLoop(AbstractNioChannel<?> ch) {
448 if (!ch.writeSuspended) {
449 write0(ch);
450 }
451 }
452
453 void writeFromSelectorLoop(final SelectionKey k) {
454 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
455 ch.writeSuspended = false;
456 write0(ch);
457 }
458
459 protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel);
460
461 protected void write0(AbstractNioChannel<?> channel) {
462 boolean open = true;
463 boolean addOpWrite = false;
464 boolean removeOpWrite = false;
465 boolean iothread = isIoThread(channel);
466
467 long writtenBytes = 0;
468
469 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
470 final WritableByteChannel ch = channel.channel;
471 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
472 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
473 synchronized (channel.writeLock) {
474 channel.inWriteNowLoop = true;
475 for (;;) {
476 MessageEvent evt = channel.currentWriteEvent;
477 SendBuffer buf;
478 if (evt == null) {
479 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
480 removeOpWrite = true;
481 channel.writeSuspended = false;
482 break;
483 }
484
485 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
486 } else {
487 buf = channel.currentWriteBuffer;
488 }
489
490 ChannelFuture future = evt.getFuture();
491 try {
492 long localWrittenBytes = 0;
493 for (int i = writeSpinCount; i > 0; i --) {
494 localWrittenBytes = buf.transferTo(ch);
495 if (localWrittenBytes != 0) {
496 writtenBytes += localWrittenBytes;
497 break;
498 }
499 if (buf.finished()) {
500 break;
501 }
502 }
503
504 if (buf.finished()) {
505
506 buf.release();
507 channel.currentWriteEvent = null;
508 channel.currentWriteBuffer = null;
509 evt = null;
510 buf = null;
511 future.setSuccess();
512 } else {
513
514 addOpWrite = true;
515 channel.writeSuspended = true;
516
517 if (localWrittenBytes > 0) {
518
519 future.setProgress(
520 localWrittenBytes,
521 buf.writtenBytes(), buf.totalBytes());
522 }
523 break;
524 }
525 } catch (AsynchronousCloseException e) {
526
527 } catch (Throwable t) {
528 if (buf != null) {
529 buf.release();
530 }
531 channel.currentWriteEvent = null;
532 channel.currentWriteBuffer = null;
533 buf = null;
534 evt = null;
535 future.setFailure(t);
536 if (iothread) {
537 fireExceptionCaught(channel, t);
538 } else {
539 fireExceptionCaughtLater(channel, t);
540 }
541 if (t instanceof IOException) {
542 open = false;
543 close(channel, succeededFuture(channel));
544 }
545 }
546 }
547 channel.inWriteNowLoop = false;
548
549
550
551
552
553
554
555 if (open) {
556 if (addOpWrite) {
557 setOpWrite(channel);
558 } else if (removeOpWrite) {
559 clearOpWrite(channel);
560 }
561 }
562 }
563 if (iothread) {
564 fireWriteComplete(channel, writtenBytes);
565 } else {
566 fireWriteCompleteLater(channel, writtenBytes);
567 }
568 }
569
570 static boolean isIoThread(AbstractNioChannel<?> channel) {
571 return Thread.currentThread() == channel.worker.thread;
572 }
573
574 protected void setOpWrite(AbstractNioChannel<?> channel) {
575 Selector selector = this.selector;
576 SelectionKey key = channel.channel.keyFor(selector);
577 if (key == null) {
578 return;
579 }
580 if (!key.isValid()) {
581 close(key);
582 return;
583 }
584
585
586
587 synchronized (channel.interestOpsLock) {
588 int interestOps = channel.getRawInterestOps();
589 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
590 interestOps |= SelectionKey.OP_WRITE;
591 key.interestOps(interestOps);
592 channel.setRawInterestOpsNow(interestOps);
593 }
594 }
595 }
596
597 protected void clearOpWrite(AbstractNioChannel<?> channel) {
598 Selector selector = this.selector;
599 SelectionKey key = channel.channel.keyFor(selector);
600 if (key == null) {
601 return;
602 }
603 if (!key.isValid()) {
604 close(key);
605 return;
606 }
607
608
609
610 synchronized (channel.interestOpsLock) {
611 int interestOps = channel.getRawInterestOps();
612 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
613 interestOps &= ~SelectionKey.OP_WRITE;
614 key.interestOps(interestOps);
615 channel.setRawInterestOpsNow(interestOps);
616 }
617 }
618 }
619
620
621 void close(AbstractNioChannel<?> channel, ChannelFuture future) {
622 boolean connected = channel.isConnected();
623 boolean bound = channel.isBound();
624 boolean iothread = isIoThread(channel);
625
626 try {
627 channel.channel.close();
628 cancelledKeys ++;
629
630 if (channel.setClosed()) {
631 future.setSuccess();
632 if (connected) {
633 if (iothread) {
634 fireChannelDisconnected(channel);
635 } else {
636 fireChannelDisconnectedLater(channel);
637 }
638 }
639 if (bound) {
640 if (iothread) {
641 fireChannelUnbound(channel);
642 } else {
643 fireChannelUnboundLater(channel);
644 }
645 }
646
647 cleanUpWriteBuffer(channel);
648 if (iothread) {
649 fireChannelClosed(channel);
650 } else {
651 fireChannelClosedLater(channel);
652 }
653 } else {
654 future.setSuccess();
655 }
656 } catch (Throwable t) {
657 future.setFailure(t);
658 if (iothread) {
659 fireExceptionCaught(channel, t);
660 } else {
661 fireExceptionCaughtLater(channel, t);
662 }
663 }
664 }
665
666 protected void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
667 Exception cause = null;
668 boolean fireExceptionCaught = false;
669
670
671 synchronized (channel.writeLock) {
672 MessageEvent evt = channel.currentWriteEvent;
673 if (evt != null) {
674
675
676 if (channel.isOpen()) {
677 cause = new NotYetConnectedException();
678 } else {
679 cause = new ClosedChannelException();
680 }
681
682 ChannelFuture future = evt.getFuture();
683 channel.currentWriteBuffer.release();
684 channel.currentWriteBuffer = null;
685 channel.currentWriteEvent = null;
686 evt = null;
687 future.setFailure(cause);
688 fireExceptionCaught = true;
689 }
690
691 Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
692 for (;;) {
693 evt = writeBuffer.poll();
694 if (evt == null) {
695 break;
696 }
697
698
699 if (cause == null) {
700 if (channel.isOpen()) {
701 cause = new NotYetConnectedException();
702 } else {
703 cause = new ClosedChannelException();
704 }
705 fireExceptionCaught = true;
706 }
707 evt.getFuture().setFailure(cause);
708
709
710 }
711 }
712
713 if (fireExceptionCaught) {
714 if (isIoThread(channel)) {
715 fireExceptionCaught(channel, cause);
716 } else {
717 fireExceptionCaughtLater(channel, cause);
718 }
719 }
720 }
721
722 void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) {
723 boolean changed = false;
724 boolean iothread = isIoThread(channel);
725 try {
726
727
728 synchronized (channel.interestOpsLock) {
729 Selector selector = this.selector;
730 SelectionKey key = channel.channel.keyFor(selector);
731
732
733 interestOps &= ~Channel.OP_WRITE;
734 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
735
736 if (key == null || selector == null) {
737 if (channel.getRawInterestOps() != interestOps) {
738 changed = true;
739 }
740
741
742
743 channel.setRawInterestOpsNow(interestOps);
744
745 future.setSuccess();
746 if (changed) {
747 if (iothread) {
748 fireChannelInterestChanged(channel);
749 } else {
750 fireChannelInterestChangedLater(channel);
751 }
752 }
753
754 return;
755 }
756
757 switch (CONSTRAINT_LEVEL) {
758 case 0:
759 if (channel.getRawInterestOps() != interestOps) {
760 key.interestOps(interestOps);
761 if (Thread.currentThread() != thread &&
762 wakenUp.compareAndSet(false, true)) {
763 selector.wakeup();
764 }
765 changed = true;
766 }
767 break;
768 case 1:
769 case 2:
770 if (channel.getRawInterestOps() != interestOps) {
771 if (Thread.currentThread() == thread) {
772 key.interestOps(interestOps);
773 changed = true;
774 } else {
775 selectorGuard.readLock().lock();
776 try {
777 if (wakenUp.compareAndSet(false, true)) {
778 selector.wakeup();
779 }
780 key.interestOps(interestOps);
781 changed = true;
782 } finally {
783 selectorGuard.readLock().unlock();
784 }
785 }
786 }
787 break;
788 default:
789 throw new Error();
790 }
791
792 if (changed) {
793 channel.setRawInterestOpsNow(interestOps);
794 }
795 }
796
797 future.setSuccess();
798 if (changed) {
799 if (iothread) {
800 fireChannelInterestChanged(channel);
801 } else {
802 fireChannelInterestChangedLater(channel);
803 }
804 }
805 } catch (CancelledKeyException e) {
806
807 ClosedChannelException cce = new ClosedChannelException();
808 future.setFailure(cce);
809 if (iothread) {
810 fireExceptionCaught(channel, cce);
811 } else {
812 fireExceptionCaughtLater(channel, cce);
813 }
814 } catch (Throwable t) {
815 future.setFailure(t);
816 if (iothread) {
817 fireExceptionCaught(channel, t);
818 } else {
819 fireExceptionCaughtLater(channel, t);
820 }
821 }
822 }
823
824
825
826
827
828
829
830
831 protected abstract boolean read(SelectionKey k);
832
833
834
835
836
837
838
839
840 protected abstract Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future);
841
842 }