1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.DatagramChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.util.Queue;
29 import java.util.concurrent.Executor;
30
31 import org.jboss.netty.buffer.ChannelBuffer;
32 import org.jboss.netty.buffer.ChannelBufferFactory;
33 import org.jboss.netty.channel.ChannelException;
34 import org.jboss.netty.channel.ChannelFuture;
35 import org.jboss.netty.channel.Channels;
36 import org.jboss.netty.channel.MessageEvent;
37 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
38
39
40
41
42
43 public class NioDatagramWorker extends AbstractNioWorker {
44
45 private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
46
47
48
49
50
51
52
53 NioDatagramWorker(final Executor executor) {
54 super(executor);
55 }
56
57
58
59
60
61 @Deprecated
62 NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) {
63 super(executor, allowShutdownOnIdle);
64 }
65
66 @Override
67 protected boolean read(final SelectionKey key) {
68 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
69 ReceiveBufferSizePredictor predictor =
70 channel.getConfig().getReceiveBufferSizePredictor();
71 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
72 final DatagramChannel nioChannel = (DatagramChannel) key.channel();
73 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
74
75 final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
76
77 boolean failure = true;
78 SocketAddress remoteAddress = null;
79 try {
80
81
82 remoteAddress = nioChannel.receive(byteBuffer);
83 failure = false;
84 } catch (ClosedChannelException e) {
85
86 } catch (Throwable t) {
87 fireExceptionCaught(channel, t);
88 }
89
90 if (remoteAddress != null) {
91
92 byteBuffer.flip();
93
94 int readBytes = byteBuffer.remaining();
95 if (readBytes > 0) {
96
97 predictor.previousReceiveBufferSize(readBytes);
98
99 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
100 buffer.setBytes(0, byteBuffer);
101 buffer.writerIndex(readBytes);
102
103
104 predictor.previousReceiveBufferSize(readBytes);
105
106
107 fireMessageReceived(
108 channel, buffer, remoteAddress);
109 }
110 }
111
112 if (failure) {
113 key.cancel();
114 close(channel, succeededFuture(channel));
115 return false;
116 }
117
118 return true;
119 }
120
121
122 @Override
123 protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
124 final Thread workerThread = thread;
125 if (workerThread == null || Thread.currentThread() != workerThread) {
126 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
127
128 boolean offered = writeTaskQueue.offer(channel.writeTask);
129 assert offered;
130 }
131
132 final Selector selector = this.selector;
133 if (selector != null) {
134 if (wakenUp.compareAndSet(false, true)) {
135 selector.wakeup();
136 }
137 }
138 return true;
139 }
140
141 return false;
142 }
143
144
145 static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
146 boolean connected = channel.isConnected();
147 boolean iothread = isIoThread(channel);
148 try {
149 channel.getDatagramChannel().disconnect();
150 future.setSuccess();
151 if (connected) {
152 if (iothread) {
153 fireChannelDisconnected(channel);
154 } else {
155 fireChannelDisconnectedLater(channel);
156 }
157 }
158 } catch (Throwable t) {
159 future.setFailure(t);
160 if (iothread) {
161 fireExceptionCaught(channel, t);
162 } else {
163 fireExceptionCaughtLater(channel, t);
164 }
165 }
166 }
167
168
169 @Override
170 protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
171 return new ChannelRegistionTask((NioDatagramChannel) channel, future);
172 }
173
174
175
176
177
178 private final class ChannelRegistionTask implements Runnable {
179 private final NioDatagramChannel channel;
180
181 private final ChannelFuture future;
182
183 ChannelRegistionTask(final NioDatagramChannel channel,
184 final ChannelFuture future) {
185 this.channel = channel;
186 this.future = future;
187 }
188
189
190
191
192
193 public void run() {
194 final SocketAddress localAddress = channel.getLocalAddress();
195 if (localAddress == null) {
196 if (future != null) {
197 future.setFailure(new ClosedChannelException());
198 }
199 close(channel, succeededFuture(channel));
200 return;
201 }
202
203 try {
204 synchronized (channel.interestOpsLock) {
205 channel.getDatagramChannel().register(
206 selector, channel.getRawInterestOps(), channel);
207 }
208 if (future != null) {
209 future.setSuccess();
210 }
211 } catch (final IOException e) {
212 if (future != null) {
213 future.setFailure(e);
214 }
215 close(channel, succeededFuture(channel));
216
217 if (!(e instanceof ClosedChannelException)) {
218 throw new ChannelException(
219 "Failed to register a socket to the selector.", e);
220 }
221 }
222 }
223 }
224
225 @Override
226 public void writeFromUserCode(final AbstractNioChannel<?> channel) {
227
228
229
230
231
232 if (!channel.isBound()) {
233 cleanUpWriteBuffer(channel);
234 return;
235 }
236
237 if (scheduleWriteIfNecessary(channel)) {
238 return;
239 }
240
241
242
243 if (channel.writeSuspended) {
244 return;
245 }
246
247 if (channel.inWriteNowLoop) {
248 return;
249 }
250
251 write0(channel);
252 }
253
254 @Override
255 protected void write0(final AbstractNioChannel<?> channel) {
256
257 boolean addOpWrite = false;
258 boolean removeOpWrite = false;
259
260 long writtenBytes = 0;
261
262 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
263 final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
264 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
265 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
266 synchronized (channel.writeLock) {
267
268 channel.inWriteNowLoop = true;
269
270
271 for (;;) {
272 MessageEvent evt = channel.currentWriteEvent;
273 SocketSendBufferPool.SendBuffer buf;
274 if (evt == null) {
275 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
276 removeOpWrite = true;
277 channel.writeSuspended = false;
278 break;
279 }
280
281 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
282 } else {
283 buf = channel.currentWriteBuffer;
284 }
285
286 try {
287 long localWrittenBytes = 0;
288 SocketAddress raddr = evt.getRemoteAddress();
289 if (raddr == null) {
290 for (int i = writeSpinCount; i > 0; i --) {
291 localWrittenBytes = buf.transferTo(ch);
292 if (localWrittenBytes != 0) {
293 writtenBytes += localWrittenBytes;
294 break;
295 }
296 if (buf.finished()) {
297 break;
298 }
299 }
300 } else {
301 for (int i = writeSpinCount; i > 0; i --) {
302 localWrittenBytes = buf.transferTo(ch, raddr);
303 if (localWrittenBytes != 0) {
304 writtenBytes += localWrittenBytes;
305 break;
306 }
307 if (buf.finished()) {
308 break;
309 }
310 }
311 }
312
313 if (localWrittenBytes > 0 || buf.finished()) {
314
315 buf.release();
316 ChannelFuture future = evt.getFuture();
317 channel.currentWriteEvent = null;
318 channel.currentWriteBuffer = null;
319 evt = null;
320 buf = null;
321 future.setSuccess();
322 } else {
323
324 addOpWrite = true;
325 channel.writeSuspended = true;
326 break;
327 }
328 } catch (final AsynchronousCloseException e) {
329
330 } catch (final Throwable t) {
331 buf.release();
332 ChannelFuture future = evt.getFuture();
333 channel.currentWriteEvent = null;
334 channel.currentWriteBuffer = null;
335 buf = null;
336 evt = null;
337 future.setFailure(t);
338 fireExceptionCaught(channel, t);
339 }
340 }
341 channel.inWriteNowLoop = false;
342
343
344
345
346
347
348
349 if (addOpWrite) {
350 setOpWrite(channel);
351 } else if (removeOpWrite) {
352 clearOpWrite(channel);
353 }
354 }
355
356 Channels.fireWriteComplete(channel, writtenBytes);
357 }
358
359 }