View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.nio.channels.AsynchronousCloseException;
22  import java.nio.channels.CancelledKeyException;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.NotYetConnectedException;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.WritableByteChannel;
28  import java.util.Iterator;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.concurrent.locks.ReadWriteLock;
37  import java.util.concurrent.locks.ReentrantReadWriteLock;
38  
39  import org.jboss.netty.channel.Channel;
40  import org.jboss.netty.channel.ChannelException;
41  import org.jboss.netty.channel.ChannelFuture;
42  import org.jboss.netty.channel.MessageEvent;
43  import org.jboss.netty.channel.socket.Worker;
44  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
45  import org.jboss.netty.logging.InternalLogger;
46  import org.jboss.netty.logging.InternalLoggerFactory;
47  import org.jboss.netty.util.ThreadRenamingRunnable;
48  import org.jboss.netty.util.internal.DeadLockProofWorker;
49  
50  abstract class AbstractNioWorker implements Worker {
51  
52  
53      private static final AtomicInteger nextId = new AtomicInteger();
54  
55      final int id = nextId.incrementAndGet();
56  
57      /**
58       * Internal Netty logger.
59       */
60      private static final InternalLogger logger = InternalLoggerFactory
61              .getInstance(AbstractNioWorker.class);
62  
63      private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
64  
65      static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
66  
67  
68      /**
69       * Executor used to execute {@link Runnable}s such as channel registration
70       * task.
71       */
72      private final Executor executor;
73  
74      /**
75       * Boolean to indicate if this worker has been started.
76       */
77      private boolean started;
78  
79      /**
80       * If this worker has been started thread will be a reference to the thread
81       * used when starting. i.e. the current thread when the run method is executed.
82       */
83      protected volatile Thread thread;
84  
85      /**
86       * The NIO {@link Selector}.
87       */
88      volatile Selector selector;
89  
90      /**
91       * Boolean that controls determines if a blocked Selector.select should
92       * break out of its selection process. In our case we use a timeone for
93       * the select method and the select method will block for that time unless
94       * waken up.
95       */
96      protected final AtomicBoolean wakenUp = new AtomicBoolean();
97  
98      /**
99       * Lock for this workers Selector.
100      */
101     private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
102 
103     /**
104      * Monitor object used to synchronize selector open/close.
105      */
106     private final Object startStopLock = new Object();
107 
108     /**
109      * Queue of channel registration tasks.
110      */
111     private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
112 
113     /**
114      * Queue of WriteTasks
115      */
116     protected final Queue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
117 
118     private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
119 
120 
121     private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
122 
123     protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
124 
125     private final boolean allowShutdownOnIdle;
126 
127     AbstractNioWorker(Executor executor) {
128         this(executor, true);
129     }
130 
131     /**
132      *
133      * @deprecated Use {@link #AbstractNioWorker(Executor)}
134      */
135     @Deprecated
136     public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) {
137         this.executor = executor;
138         this.allowShutdownOnIdle = allowShutdownOnIdle;
139     }
140 
141     void register(AbstractNioChannel<?> channel, ChannelFuture future) {
142 
143         Runnable registerTask = createRegisterTask(channel, future);
144 
145         synchronized (startStopLock) {
146             Selector selector = start();
147 
148 
149             boolean offered = registerTaskQueue.offer(registerTask);
150             assert offered;
151 
152             if (wakenUp.compareAndSet(false, true)) {
153                 // wake up the selector to speed things
154                 selector = this.selector;
155 
156                 // Check if the selector is not null to prevent NPE if selector was
157                 // set to null from another thread. See #469
158                 if (selector != null) {
159                     selector.wakeup();
160                 }
161             }
162 
163         }
164     }
165 
166     /**
167      * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
168      * the {@link AbstractNioChannel}'s when they get registered
169      *
170      * @return selector
171      */
172     private Selector start() {
173         if (!started) {
174             // Open a selector if this worker didn't start yet.
175             try {
176                 selector = Selector.open();
177             } catch (Throwable t) {
178                 throw new ChannelException("Failed to create a selector.", t);
179             }
180 
181             // Start the worker thread with the new Selector.
182             boolean success = false;
183             try {
184                 DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O  worker #" + id));
185                 success = true;
186             } finally {
187                 if (!success) {
188                     // Release the Selector if the execution fails.
189                     try {
190                         selector.close();
191                     } catch (Throwable t) {
192                         logger.warn("Failed to close a selector.", t);
193                     }
194                     selector = null;
195                     // The method will return to the caller at this point.
196                 }
197             }
198         }
199 
200         assert selector != null && selector.isOpen();
201 
202         started = true;
203         return selector;
204     }
205 
206 
207     public void run() {
208         thread = Thread.currentThread();
209 
210         boolean shutdown = false;
211         Selector selector = this.selector;
212         for (;;) {
213             wakenUp.set(false);
214 
215             if (CONSTRAINT_LEVEL != 0) {
216                 selectorGuard.writeLock().lock();
217                     // This empty synchronization block prevents the selector
218                     // from acquiring its lock.
219                 selectorGuard.writeLock().unlock();
220             }
221 
222             try {
223                 SelectorUtil.select(selector);
224 
225                 // 'wakenUp.compareAndSet(false, true)' is always evaluated
226                 // before calling 'selector.wakeup()' to reduce the wake-up
227                 // overhead. (Selector.wakeup() is an expensive operation.)
228                 //
229                 // However, there is a race condition in this approach.
230                 // The race condition is triggered when 'wakenUp' is set to
231                 // true too early.
232                 //
233                 // 'wakenUp' is set to true too early if:
234                 // 1) Selector is waken up between 'wakenUp.set(false)' and
235                 //    'selector.select(...)'. (BAD)
236                 // 2) Selector is waken up between 'selector.select(...)' and
237                 //    'if (wakenUp.get()) { ... }'. (OK)
238                 //
239                 // In the first case, 'wakenUp' is set to true and the
240                 // following 'selector.select(...)' will wake up immediately.
241                 // Until 'wakenUp' is set to false again in the next round,
242                 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
243                 // any attempt to wake up the Selector will fail, too, causing
244                 // the following 'selector.select(...)' call to block
245                 // unnecessarily.
246                 //
247                 // To fix this problem, we wake up the selector again if wakenUp
248                 // is true immediately after selector.select(...).
249                 // It is inefficient in that it wakes up the selector for both
250                 // the first case (BAD - wake-up required) and the second case
251                 // (OK - no wake-up required).
252 
253                 if (wakenUp.get()) {
254                     selector.wakeup();
255                 }
256 
257                 cancelledKeys = 0;
258                 processRegisterTaskQueue();
259                 processEventQueue();
260                 processWriteTaskQueue();
261                 processSelectedKeys(selector.selectedKeys());
262 
263                 // Exit the loop when there's nothing to handle.
264                 // The shutdown flag is used to delay the shutdown of this
265                 // loop to avoid excessive Selector creation when
266                 // connections are registered in a one-by-one manner instead of
267                 // concurrent manner.
268                 if (selector.keys().isEmpty()) {
269                     if (shutdown ||
270                         executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
271 
272                         synchronized (startStopLock) {
273                             if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
274                                 started = false;
275                                 try {
276                                     selector.close();
277                                 } catch (IOException e) {
278                                     logger.warn(
279                                             "Failed to close a selector.", e);
280                                 } finally {
281                                     this.selector = null;
282                                 }
283                                 break;
284                             } else {
285                                 shutdown = false;
286                             }
287                         }
288                     } else {
289                         if (allowShutdownOnIdle) {
290                             // Give one more second.
291                             shutdown = true;
292                         }
293                     }
294                 } else {
295                     shutdown = false;
296                 }
297             } catch (Throwable t) {
298                 logger.warn(
299                         "Unexpected exception in the selector loop.", t);
300 
301                 // Prevent possible consecutive immediate failures that lead to
302                 // excessive CPU consumption.
303                 try {
304                     Thread.sleep(1000);
305                 } catch (InterruptedException e) {
306                     // Ignore.
307                 }
308             }
309         }
310     }
311 
312     public void executeInIoThread(Runnable task) {
313         executeInIoThread(task, false);
314     }
315 
316     /**
317      * Execute the {@link Runnable} in a IO-Thread
318      *
319      * @param task
320      *            the {@link Runnable} to execute
321      * @param alwaysAsync
322      *            <code>true</code> if the {@link Runnable} should be executed
323      *            in an async fashion even if the current Thread == IO Thread
324      */
325     public void executeInIoThread(Runnable task, boolean alwaysAsync) {
326         if (!alwaysAsync && Thread.currentThread() == thread) {
327             task.run();
328         } else {
329             synchronized (startStopLock) {
330                 start();
331                 boolean added = eventQueue.offer(task);
332 
333                 assert added;
334                 if (added) {
335                     // wake up the selector to speed things
336                     Selector selector = this.selector;
337                     if (selector != null) {
338                         selector.wakeup();
339                     }
340                 }
341             }
342 
343         }
344 
345     }
346 
347 
348     private void processRegisterTaskQueue() throws IOException {
349         for (;;) {
350             final Runnable task = registerTaskQueue.poll();
351             if (task == null) {
352                 break;
353             }
354 
355             task.run();
356             cleanUpCancelledKeys();
357         }
358     }
359 
360     private void processWriteTaskQueue() throws IOException {
361         for (;;) {
362             final Runnable task = writeTaskQueue.poll();
363             if (task == null) {
364                 break;
365             }
366 
367             task.run();
368             cleanUpCancelledKeys();
369         }
370     }
371 
372     private void processEventQueue() throws IOException {
373         for (;;) {
374             final Runnable task = eventQueue.poll();
375             if (task == null) {
376                 break;
377             }
378             task.run();
379             cleanUpCancelledKeys();
380         }
381     }
382 
383     private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
384         for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
385             SelectionKey k = i.next();
386             i.remove();
387             try {
388                 int readyOps = k.readyOps();
389                 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
390                     if (!read(k)) {
391                         // Connection already closed - no need to handle write.
392                         continue;
393                     }
394                 }
395                 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
396                     writeFromSelectorLoop(k);
397                 }
398             } catch (CancelledKeyException e) {
399                 close(k);
400             }
401 
402             if (cleanUpCancelledKeys()) {
403                 break; // break the loop to avoid ConcurrentModificationException
404             }
405         }
406     }
407 
408     private boolean cleanUpCancelledKeys() throws IOException {
409         if (cancelledKeys >= CLEANUP_INTERVAL) {
410             cancelledKeys = 0;
411             selector.selectNow();
412             return true;
413         }
414         return false;
415     }
416 
417 
418 
419     private void close(SelectionKey k) {
420         AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
421         close(ch, succeededFuture(ch));
422     }
423 
424     void writeFromUserCode(final AbstractNioChannel<?> channel) {
425         if (!channel.isConnected()) {
426             cleanUpWriteBuffer(channel);
427             return;
428         }
429 
430         if (scheduleWriteIfNecessary(channel)) {
431             return;
432         }
433 
434         // From here, we are sure Thread.currentThread() == workerThread.
435 
436         if (channel.writeSuspended) {
437             return;
438         }
439 
440         if (channel.inWriteNowLoop) {
441             return;
442         }
443 
444         write0(channel);
445     }
446 
447     void writeFromTaskLoop(AbstractNioChannel<?> ch) {
448         if (!ch.writeSuspended) {
449             write0(ch);
450         }
451     }
452 
453     void writeFromSelectorLoop(final SelectionKey k) {
454         AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
455         ch.writeSuspended = false;
456         write0(ch);
457     }
458 
459     protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel);
460 
461     protected void write0(AbstractNioChannel<?> channel) {
462         boolean open = true;
463         boolean addOpWrite = false;
464         boolean removeOpWrite = false;
465         boolean iothread = isIoThread(channel);
466 
467         long writtenBytes = 0;
468 
469         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
470         final WritableByteChannel ch = channel.channel;
471         final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
472         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
473         synchronized (channel.writeLock) {
474             channel.inWriteNowLoop = true;
475             for (;;) {
476                 MessageEvent evt = channel.currentWriteEvent;
477                 SendBuffer buf;
478                 if (evt == null) {
479                     if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
480                         removeOpWrite = true;
481                         channel.writeSuspended = false;
482                         break;
483                     }
484 
485                     channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
486                 } else {
487                     buf = channel.currentWriteBuffer;
488                 }
489 
490                 ChannelFuture future = evt.getFuture();
491                 try {
492                     long localWrittenBytes = 0;
493                     for (int i = writeSpinCount; i > 0; i --) {
494                         localWrittenBytes = buf.transferTo(ch);
495                         if (localWrittenBytes != 0) {
496                             writtenBytes += localWrittenBytes;
497                             break;
498                         }
499                         if (buf.finished()) {
500                             break;
501                         }
502                     }
503 
504                     if (buf.finished()) {
505                         // Successful write - proceed to the next message.
506                         buf.release();
507                         channel.currentWriteEvent = null;
508                         channel.currentWriteBuffer = null;
509                         evt = null;
510                         buf = null;
511                         future.setSuccess();
512                     } else {
513                         // Not written fully - perhaps the kernel buffer is full.
514                         addOpWrite = true;
515                         channel.writeSuspended = true;
516 
517                         if (localWrittenBytes > 0) {
518                             // Notify progress listeners if necessary.
519                             future.setProgress(
520                                     localWrittenBytes,
521                                     buf.writtenBytes(), buf.totalBytes());
522                         }
523                         break;
524                     }
525                 } catch (AsynchronousCloseException e) {
526                     // Doesn't need a user attention - ignore.
527                 } catch (Throwable t) {
528                     if (buf != null) {
529                         buf.release();
530                     }
531                     channel.currentWriteEvent = null;
532                     channel.currentWriteBuffer = null;
533                     buf = null;
534                     evt = null;
535                     future.setFailure(t);
536                     if (iothread) {
537                         fireExceptionCaught(channel, t);
538                     } else {
539                         fireExceptionCaughtLater(channel, t);
540                     }
541                     if (t instanceof IOException) {
542                         open = false;
543                         close(channel, succeededFuture(channel));
544                     }
545                 }
546             }
547             channel.inWriteNowLoop = false;
548 
549             // Initially, the following block was executed after releasing
550             // the writeLock, but there was a race condition, and it has to be
551             // executed before releasing the writeLock:
552             //
553             //     https://issues.jboss.org/browse/NETTY-410
554             //
555             if (open) {
556                 if (addOpWrite) {
557                     setOpWrite(channel);
558                 } else if (removeOpWrite) {
559                     clearOpWrite(channel);
560                 }
561             }
562         }
563         if (iothread) {
564             fireWriteComplete(channel, writtenBytes);
565         } else {
566             fireWriteCompleteLater(channel, writtenBytes);
567         }
568     }
569 
570     static boolean isIoThread(AbstractNioChannel<?> channel) {
571         return Thread.currentThread() == channel.worker.thread;
572     }
573 
574     protected void setOpWrite(AbstractNioChannel<?> channel) {
575         Selector selector = this.selector;
576         SelectionKey key = channel.channel.keyFor(selector);
577         if (key == null) {
578             return;
579         }
580         if (!key.isValid()) {
581             close(key);
582             return;
583         }
584 
585         // interestOps can change at any time and at any thread.
586         // Acquire a lock to avoid possible race condition.
587         synchronized (channel.interestOpsLock) {
588             int interestOps = channel.getRawInterestOps();
589             if ((interestOps & SelectionKey.OP_WRITE) == 0) {
590                 interestOps |= SelectionKey.OP_WRITE;
591                 key.interestOps(interestOps);
592                 channel.setRawInterestOpsNow(interestOps);
593             }
594         }
595     }
596 
597     protected void clearOpWrite(AbstractNioChannel<?> channel) {
598         Selector selector = this.selector;
599         SelectionKey key = channel.channel.keyFor(selector);
600         if (key == null) {
601             return;
602         }
603         if (!key.isValid()) {
604             close(key);
605             return;
606         }
607 
608         // interestOps can change at any time and at any thread.
609         // Acquire a lock to avoid possible race condition.
610         synchronized (channel.interestOpsLock) {
611             int interestOps = channel.getRawInterestOps();
612             if ((interestOps & SelectionKey.OP_WRITE) != 0) {
613                 interestOps &= ~SelectionKey.OP_WRITE;
614                 key.interestOps(interestOps);
615                 channel.setRawInterestOpsNow(interestOps);
616             }
617         }
618     }
619 
620 
621     void close(AbstractNioChannel<?> channel, ChannelFuture future) {
622         boolean connected = channel.isConnected();
623         boolean bound = channel.isBound();
624         boolean iothread = isIoThread(channel);
625 
626         try {
627             channel.channel.close();
628             cancelledKeys ++;
629 
630             if (channel.setClosed()) {
631                 future.setSuccess();
632                 if (connected) {
633                     if (iothread) {
634                         fireChannelDisconnected(channel);
635                     } else {
636                         fireChannelDisconnectedLater(channel);
637                     }
638                 }
639                 if (bound) {
640                     if (iothread) {
641                         fireChannelUnbound(channel);
642                     } else {
643                         fireChannelUnboundLater(channel);
644                     }
645                 }
646 
647                 cleanUpWriteBuffer(channel);
648                 if (iothread) {
649                     fireChannelClosed(channel);
650                 } else {
651                     fireChannelClosedLater(channel);
652                 }
653             } else {
654                 future.setSuccess();
655             }
656         } catch (Throwable t) {
657             future.setFailure(t);
658             if (iothread) {
659                 fireExceptionCaught(channel, t);
660             } else {
661                 fireExceptionCaughtLater(channel, t);
662             }
663         }
664     }
665 
666     protected void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
667         Exception cause = null;
668         boolean fireExceptionCaught = false;
669 
670         // Clean up the stale messages in the write buffer.
671         synchronized (channel.writeLock) {
672             MessageEvent evt = channel.currentWriteEvent;
673             if (evt != null) {
674                 // Create the exception only once to avoid the excessive overhead
675                 // caused by fillStackTrace.
676                 if (channel.isOpen()) {
677                     cause = new NotYetConnectedException();
678                 } else {
679                     cause = new ClosedChannelException();
680                 }
681 
682                 ChannelFuture future = evt.getFuture();
683                 channel.currentWriteBuffer.release();
684                 channel.currentWriteBuffer = null;
685                 channel.currentWriteEvent = null;
686                 evt = null;
687                 future.setFailure(cause);
688                 fireExceptionCaught = true;
689             }
690 
691             Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
692             for (;;) {
693                 evt = writeBuffer.poll();
694                 if (evt == null) {
695                     break;
696                 }
697                 // Create the exception only once to avoid the excessive overhead
698                 // caused by fillStackTrace.
699                 if (cause == null) {
700                     if (channel.isOpen()) {
701                         cause = new NotYetConnectedException();
702                     } else {
703                         cause = new ClosedChannelException();
704                     }
705                     fireExceptionCaught = true;
706                 }
707                 evt.getFuture().setFailure(cause);
708 
709 
710             }
711         }
712 
713         if (fireExceptionCaught) {
714             if (isIoThread(channel)) {
715                 fireExceptionCaught(channel, cause);
716             } else {
717                 fireExceptionCaughtLater(channel, cause);
718             }
719         }
720     }
721 
722     void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) {
723         boolean changed = false;
724         boolean iothread = isIoThread(channel);
725         try {
726             // interestOps can change at any time and at any thread.
727             // Acquire a lock to avoid possible race condition.
728             synchronized (channel.interestOpsLock) {
729                 Selector selector = this.selector;
730                 SelectionKey key = channel.channel.keyFor(selector);
731 
732                 // Override OP_WRITE flag - a user cannot change this flag.
733                 interestOps &= ~Channel.OP_WRITE;
734                 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
735 
736                 if (key == null || selector == null) {
737                     if (channel.getRawInterestOps() != interestOps) {
738                         changed = true;
739                     }
740 
741                     // Not registered to the worker yet.
742                     // Set the rawInterestOps immediately; RegisterTask will pick it up.
743                     channel.setRawInterestOpsNow(interestOps);
744 
745                     future.setSuccess();
746                     if (changed) {
747                         if (iothread) {
748                             fireChannelInterestChanged(channel);
749                         } else {
750                             fireChannelInterestChangedLater(channel);
751                         }
752                     }
753 
754                     return;
755                 }
756 
757                 switch (CONSTRAINT_LEVEL) {
758                 case 0:
759                     if (channel.getRawInterestOps() != interestOps) {
760                         key.interestOps(interestOps);
761                         if (Thread.currentThread() != thread &&
762                             wakenUp.compareAndSet(false, true)) {
763                             selector.wakeup();
764                         }
765                         changed = true;
766                     }
767                     break;
768                 case 1:
769                 case 2:
770                     if (channel.getRawInterestOps() != interestOps) {
771                         if (Thread.currentThread() == thread) {
772                             key.interestOps(interestOps);
773                             changed = true;
774                         } else {
775                             selectorGuard.readLock().lock();
776                             try {
777                                 if (wakenUp.compareAndSet(false, true)) {
778                                     selector.wakeup();
779                                 }
780                                 key.interestOps(interestOps);
781                                 changed = true;
782                             } finally {
783                                 selectorGuard.readLock().unlock();
784                             }
785                         }
786                     }
787                     break;
788                 default:
789                     throw new Error();
790                 }
791 
792                 if (changed) {
793                     channel.setRawInterestOpsNow(interestOps);
794                 }
795             }
796 
797             future.setSuccess();
798             if (changed) {
799                 if (iothread) {
800                     fireChannelInterestChanged(channel);
801                 } else {
802                     fireChannelInterestChangedLater(channel);
803                 }
804             }
805         } catch (CancelledKeyException e) {
806             // setInterestOps() was called on a closed channel.
807             ClosedChannelException cce = new ClosedChannelException();
808             future.setFailure(cce);
809             if (iothread) {
810                 fireExceptionCaught(channel, cce);
811             } else {
812                 fireExceptionCaughtLater(channel, cce);
813             }
814         } catch (Throwable t) {
815             future.setFailure(t);
816             if (iothread) {
817                 fireExceptionCaught(channel, t);
818             } else {
819                 fireExceptionCaughtLater(channel, t);
820             }
821         }
822     }
823 
824     /**
825      * Read is called when a Selector has been notified that the underlying channel
826      * was something to be read. The channel would previously have registered its interest
827      * in read operations.
828      *
829      * @param k The selection key which contains the Selector registration information.
830      */
831     protected abstract boolean read(SelectionKey k);
832 
833     /**
834      * Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel}
835      *
836      * @param channel
837      * @param future
838      * @return task
839      */
840     protected abstract Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future);
841 
842 }