1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.concurrent.Executor;
28
29 import org.jboss.netty.buffer.ChannelBuffer;
30 import org.jboss.netty.buffer.ChannelBufferFactory;
31 import org.jboss.netty.channel.ChannelException;
32 import org.jboss.netty.channel.ChannelFuture;
33 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
34
35 public class NioWorker extends AbstractNioWorker {
36
37 private final SocketReceiveBufferAllocator recvBufferPool = new SocketReceiveBufferAllocator();
38
39 public NioWorker(Executor executor) {
40 super(executor);
41 }
42
43
44
45
46
47 @Deprecated
48 public NioWorker(Executor executor, boolean allowShutdownOnIdle) {
49 super(executor, allowShutdownOnIdle);
50 }
51
52
53 @Override
54 protected boolean read(SelectionKey k) {
55 final SocketChannel ch = (SocketChannel) k.channel();
56 final NioSocketChannel channel = (NioSocketChannel) k.attachment();
57
58 final ReceiveBufferSizePredictor predictor =
59 channel.getConfig().getReceiveBufferSizePredictor();
60 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
61 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
62
63 int ret = 0;
64 int readBytes = 0;
65 boolean failure = true;
66
67 ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
68 try {
69 while ((ret = ch.read(bb)) > 0) {
70 readBytes += ret;
71 if (!bb.hasRemaining()) {
72 break;
73 }
74 }
75 failure = false;
76 } catch (ClosedChannelException e) {
77
78 } catch (Throwable t) {
79 fireExceptionCaught(channel, t);
80 }
81
82 if (readBytes > 0) {
83 bb.flip();
84
85 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
86 buffer.setBytes(0, bb);
87 buffer.writerIndex(readBytes);
88
89
90
91 predictor.previousReceiveBufferSize(readBytes);
92
93
94 fireMessageReceived(channel, buffer);
95 }
96
97 if (ret < 0 || failure) {
98 k.cancel();
99 close(channel, succeededFuture(channel));
100 return false;
101 }
102
103 return true;
104 }
105
106
107 @Override
108 protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
109 final Thread currentThread = Thread.currentThread();
110 final Thread workerThread = thread;
111 if (currentThread != workerThread) {
112 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
113 boolean offered = writeTaskQueue.offer(channel.writeTask);
114 assert offered;
115 }
116
117 if (!(channel instanceof NioAcceptedSocketChannel) ||
118 ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
119 final Selector workerSelector = selector;
120 if (workerSelector != null) {
121 if (wakenUp.compareAndSet(false, true)) {
122 workerSelector.wakeup();
123 }
124 }
125 } else {
126
127
128
129
130
131
132
133
134
135 }
136
137 return true;
138 }
139
140 return false;
141 }
142
143 @Override
144 protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
145 boolean server = !(channel instanceof NioClientSocketChannel);
146 return new RegisterTask((NioSocketChannel) channel, future, server);
147 }
148
149 private final class RegisterTask implements Runnable {
150 private final NioSocketChannel channel;
151 private final ChannelFuture future;
152 private final boolean server;
153
154 RegisterTask(
155 NioSocketChannel channel, ChannelFuture future, boolean server) {
156
157 this.channel = channel;
158 this.future = future;
159 this.server = server;
160 }
161
162 public void run() {
163 SocketAddress localAddress = channel.getLocalAddress();
164 SocketAddress remoteAddress = channel.getRemoteAddress();
165
166 if (localAddress == null || remoteAddress == null) {
167 if (future != null) {
168 future.setFailure(new ClosedChannelException());
169 }
170 close(channel, succeededFuture(channel));
171 return;
172 }
173
174 try {
175 if (server) {
176 channel.channel.configureBlocking(false);
177 }
178
179 synchronized (channel.interestOpsLock) {
180 channel.channel.register(
181 selector, channel.getRawInterestOps(), channel);
182 }
183 if (future != null) {
184 channel.setConnected();
185 future.setSuccess();
186 }
187
188 if (server || !((NioClientSocketChannel) channel).boundManually) {
189 fireChannelBound(channel, localAddress);
190 }
191 fireChannelConnected(channel, remoteAddress);
192 } catch (IOException e) {
193 if (future != null) {
194 future.setFailure(e);
195 }
196 close(channel, succeededFuture(channel));
197 if (!(e instanceof ClosedChannelException)) {
198 throw new ChannelException(
199 "Failed to register a socket to the selector.", e);
200 }
201 }
202
203 }
204 }
205
206 }