View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.concurrent.Executor;
28  
29  import org.jboss.netty.buffer.ChannelBuffer;
30  import org.jboss.netty.buffer.ChannelBufferFactory;
31  import org.jboss.netty.channel.ChannelException;
32  import org.jboss.netty.channel.ChannelFuture;
33  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
34  
35  public class NioWorker extends AbstractNioWorker {
36  
37      private final SocketReceiveBufferAllocator recvBufferPool = new SocketReceiveBufferAllocator();
38  
39      public NioWorker(Executor executor) {
40          super(executor);
41      }
42  
43      /**
44       *
45       * @deprecated use {@link #NioWorker(Executor)}
46       */
47      @Deprecated
48      public NioWorker(Executor executor, boolean allowShutdownOnIdle) {
49          super(executor, allowShutdownOnIdle);
50      }
51  
52  
53      @Override
54      protected boolean read(SelectionKey k) {
55          final SocketChannel ch = (SocketChannel) k.channel();
56          final NioSocketChannel channel = (NioSocketChannel) k.attachment();
57  
58          final ReceiveBufferSizePredictor predictor =
59              channel.getConfig().getReceiveBufferSizePredictor();
60          final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
61          final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
62  
63          int ret = 0;
64          int readBytes = 0;
65          boolean failure = true;
66  
67          ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
68          try {
69              while ((ret = ch.read(bb)) > 0) {
70                  readBytes += ret;
71                  if (!bb.hasRemaining()) {
72                      break;
73                  }
74              }
75              failure = false;
76          } catch (ClosedChannelException e) {
77              // Can happen, and does not need a user attention.
78          } catch (Throwable t) {
79              fireExceptionCaught(channel, t);
80          }
81  
82          if (readBytes > 0) {
83              bb.flip();
84  
85              final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
86              buffer.setBytes(0, bb);
87              buffer.writerIndex(readBytes);
88  
89  
90              // Update the predictor.
91              predictor.previousReceiveBufferSize(readBytes);
92  
93              // Fire the event.
94              fireMessageReceived(channel, buffer);
95          }
96  
97          if (ret < 0 || failure) {
98              k.cancel(); // Some JDK implementations run into an infinite loop without this.
99              close(channel, succeededFuture(channel));
100             return false;
101         }
102 
103         return true;
104     }
105 
106 
107     @Override
108     protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
109         final Thread currentThread = Thread.currentThread();
110         final Thread workerThread = thread;
111         if (currentThread != workerThread) {
112             if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
113                 boolean offered = writeTaskQueue.offer(channel.writeTask);
114                 assert offered;
115             }
116 
117             if (!(channel instanceof NioAcceptedSocketChannel) ||
118                 ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
119                 final Selector workerSelector = selector;
120                 if (workerSelector != null) {
121                     if (wakenUp.compareAndSet(false, true)) {
122                         workerSelector.wakeup();
123                     }
124                 }
125             } else {
126                 // A write request can be made from an acceptor thread (boss)
127                 // when a user attempted to write something in:
128                 //
129                 //   * channelOpen()
130                 //   * channelBound()
131                 //   * channelConnected().
132                 //
133                 // In this case, there's no need to wake up the selector because
134                 // the channel is not even registered yet at this moment.
135             }
136 
137             return true;
138         }
139 
140         return false;
141     }
142 
143     @Override
144     protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
145         boolean server = !(channel instanceof NioClientSocketChannel);
146         return new RegisterTask((NioSocketChannel) channel, future, server);
147     }
148 
149     private final class RegisterTask implements Runnable {
150         private final NioSocketChannel channel;
151         private final ChannelFuture future;
152         private final boolean server;
153 
154         RegisterTask(
155                 NioSocketChannel channel, ChannelFuture future, boolean server) {
156 
157             this.channel = channel;
158             this.future = future;
159             this.server = server;
160         }
161 
162         public void run() {
163             SocketAddress localAddress = channel.getLocalAddress();
164             SocketAddress remoteAddress = channel.getRemoteAddress();
165 
166             if (localAddress == null || remoteAddress == null) {
167                 if (future != null) {
168                     future.setFailure(new ClosedChannelException());
169                 }
170                 close(channel, succeededFuture(channel));
171                 return;
172             }
173 
174             try {
175                 if (server) {
176                     channel.channel.configureBlocking(false);
177                 }
178 
179                 synchronized (channel.interestOpsLock) {
180                     channel.channel.register(
181                             selector, channel.getRawInterestOps(), channel);
182                 }
183                 if (future != null) {
184                     channel.setConnected();
185                     future.setSuccess();
186                 }
187 
188                 if (server || !((NioClientSocketChannel) channel).boundManually) {
189                     fireChannelBound(channel, localAddress);
190                 }
191                 fireChannelConnected(channel, remoteAddress);
192             } catch (IOException e) {
193                 if (future != null) {
194                     future.setFailure(e);
195                 }
196                 close(channel, succeededFuture(channel));
197                 if (!(e instanceof ClosedChannelException)) {
198                     throw new ChannelException(
199                             "Failed to register a socket to the selector.", e);
200                 }
201             }
202 
203         }
204     }
205 
206 }