View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Queue;
29  import java.util.concurrent.Executor;
30  
31  import org.jboss.netty.buffer.ChannelBuffer;
32  import org.jboss.netty.buffer.ChannelBufferFactory;
33  import org.jboss.netty.channel.ChannelException;
34  import org.jboss.netty.channel.ChannelFuture;
35  import org.jboss.netty.channel.Channels;
36  import org.jboss.netty.channel.MessageEvent;
37  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
38  
39  /**
40   * A class responsible for registering channels with {@link Selector}.
41   * It also implements the {@link Selector} loop.
42   */
43  public class NioDatagramWorker extends AbstractNioWorker {
44  
45      private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
46  
47      /**
48       * Sole constructor.
49       *
50       * @param executor the {@link Executor} used to execute {@link Runnable}s
51       *                 such as {@link ChannelRegistionTask}
52       */
53      NioDatagramWorker(final Executor executor) {
54          super(executor);
55      }
56  
57      /**
58       *
59       * @deprecated use {@link #NioDatagramWorker(Executor)}
60       */
61      @Deprecated
62      NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) {
63          super(executor, allowShutdownOnIdle);
64      }
65  
66      @Override
67      protected boolean read(final SelectionKey key) {
68          final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
69          ReceiveBufferSizePredictor predictor =
70              channel.getConfig().getReceiveBufferSizePredictor();
71          final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
72          final DatagramChannel nioChannel = (DatagramChannel) key.channel();
73          final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
74  
75          final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
76  
77          boolean failure = true;
78          SocketAddress remoteAddress = null;
79          try {
80              // Receive from the channel in a non blocking mode. We have already been notified that
81              // the channel is ready to receive.
82              remoteAddress = nioChannel.receive(byteBuffer);
83              failure = false;
84          } catch (ClosedChannelException e) {
85              // Can happen, and does not need a user attention.
86          } catch (Throwable t) {
87              fireExceptionCaught(channel, t);
88          }
89  
90          if (remoteAddress != null) {
91              // Flip the buffer so that we can wrap it.
92              byteBuffer.flip();
93  
94              int readBytes = byteBuffer.remaining();
95              if (readBytes > 0) {
96                  // Update the predictor.
97                  predictor.previousReceiveBufferSize(readBytes);
98  
99                  final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
100                 buffer.setBytes(0, byteBuffer);
101                 buffer.writerIndex(readBytes);
102 
103                 // Update the predictor.
104                 predictor.previousReceiveBufferSize(readBytes);
105 
106                 // Notify the interested parties about the newly arrived message.
107                 fireMessageReceived(
108                         channel, buffer, remoteAddress);
109             }
110         }
111 
112         if (failure) {
113             key.cancel(); // Some JDK implementations run into an infinite loop without this.
114             close(channel, succeededFuture(channel));
115             return false;
116         }
117 
118         return true;
119     }
120 
121 
122     @Override
123     protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
124         final Thread workerThread = thread;
125         if (workerThread == null || Thread.currentThread() != workerThread) {
126             if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
127                 // "add" the channels writeTask to the writeTaskQueue.
128                 boolean offered = writeTaskQueue.offer(channel.writeTask);
129                 assert offered;
130             }
131 
132             final Selector selector = this.selector;
133             if (selector != null) {
134                 if (wakenUp.compareAndSet(false, true)) {
135                     selector.wakeup();
136                 }
137             }
138             return true;
139         }
140 
141         return false;
142     }
143 
144 
145     static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
146         boolean connected = channel.isConnected();
147         boolean iothread = isIoThread(channel);
148         try {
149             channel.getDatagramChannel().disconnect();
150             future.setSuccess();
151             if (connected) {
152                 if (iothread) {
153                     fireChannelDisconnected(channel);
154                 } else {
155                     fireChannelDisconnectedLater(channel);
156                 }
157             }
158         } catch (Throwable t) {
159             future.setFailure(t);
160             if (iothread) {
161                 fireExceptionCaught(channel, t);
162             } else {
163                 fireExceptionCaughtLater(channel, t);
164             }
165         }
166     }
167 
168 
169     @Override
170     protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
171         return new ChannelRegistionTask((NioDatagramChannel) channel, future);
172     }
173 
174     /**
175      * RegisterTask is a task responsible for registering a channel with a
176      * selector.
177      */
178     private final class ChannelRegistionTask implements Runnable {
179         private final NioDatagramChannel channel;
180 
181         private final ChannelFuture future;
182 
183         ChannelRegistionTask(final NioDatagramChannel channel,
184                 final ChannelFuture future) {
185             this.channel = channel;
186             this.future = future;
187         }
188 
189         /**
190          * This runnable's task. Does the actual registering by calling the
191          * underlying DatagramChannels peer DatagramSocket register method.
192          */
193         public void run() {
194             final SocketAddress localAddress = channel.getLocalAddress();
195             if (localAddress == null) {
196                 if (future != null) {
197                     future.setFailure(new ClosedChannelException());
198                 }
199                 close(channel, succeededFuture(channel));
200                 return;
201             }
202 
203             try {
204                 synchronized (channel.interestOpsLock) {
205                     channel.getDatagramChannel().register(
206                             selector, channel.getRawInterestOps(), channel);
207                 }
208                 if (future != null) {
209                     future.setSuccess();
210                 }
211             } catch (final IOException e) {
212                 if (future != null) {
213                     future.setFailure(e);
214                 }
215                 close(channel, succeededFuture(channel));
216 
217                 if (!(e instanceof ClosedChannelException)) {
218                     throw new ChannelException(
219                             "Failed to register a socket to the selector.", e);
220                 }
221             }
222         }
223     }
224 
225     @Override
226     public void writeFromUserCode(final AbstractNioChannel<?> channel) {
227         /*
228          * Note that we are not checking if the channel is connected. Connected
229          * has a different meaning in UDP and means that the channels socket is
230          * configured to only send and receive from a given remote peer.
231          */
232         if (!channel.isBound()) {
233             cleanUpWriteBuffer(channel);
234             return;
235         }
236 
237         if (scheduleWriteIfNecessary(channel)) {
238             return;
239         }
240 
241         // From here, we are sure Thread.currentThread() == workerThread.
242 
243         if (channel.writeSuspended) {
244             return;
245         }
246 
247         if (channel.inWriteNowLoop) {
248             return;
249         }
250 
251         write0(channel);
252     }
253 
254     @Override
255     protected void write0(final AbstractNioChannel<?> channel) {
256 
257         boolean addOpWrite = false;
258         boolean removeOpWrite = false;
259 
260         long writtenBytes = 0;
261 
262         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
263         final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
264         final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
265         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
266         synchronized (channel.writeLock) {
267             // inform the channel that write is in-progress
268             channel.inWriteNowLoop = true;
269 
270             // loop forever...
271             for (;;) {
272                 MessageEvent evt = channel.currentWriteEvent;
273                 SocketSendBufferPool.SendBuffer buf;
274                 if (evt == null) {
275                     if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
276                         removeOpWrite = true;
277                         channel.writeSuspended = false;
278                         break;
279                     }
280 
281                     channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
282                 } else {
283                     buf = channel.currentWriteBuffer;
284                 }
285 
286                 try {
287                     long localWrittenBytes = 0;
288                     SocketAddress raddr = evt.getRemoteAddress();
289                     if (raddr == null) {
290                         for (int i = writeSpinCount; i > 0; i --) {
291                             localWrittenBytes = buf.transferTo(ch);
292                             if (localWrittenBytes != 0) {
293                                 writtenBytes += localWrittenBytes;
294                                 break;
295                             }
296                             if (buf.finished()) {
297                                 break;
298                             }
299                         }
300                     } else {
301                         for (int i = writeSpinCount; i > 0; i --) {
302                             localWrittenBytes = buf.transferTo(ch, raddr);
303                             if (localWrittenBytes != 0) {
304                                 writtenBytes += localWrittenBytes;
305                                 break;
306                             }
307                             if (buf.finished()) {
308                                 break;
309                             }
310                         }
311                     }
312 
313                     if (localWrittenBytes > 0 || buf.finished()) {
314                         // Successful write - proceed to the next message.
315                         buf.release();
316                         ChannelFuture future = evt.getFuture();
317                         channel.currentWriteEvent = null;
318                         channel.currentWriteBuffer = null;
319                         evt = null;
320                         buf = null;
321                         future.setSuccess();
322                     } else {
323                         // Not written at all - perhaps the kernel buffer is full.
324                         addOpWrite = true;
325                         channel.writeSuspended = true;
326                         break;
327                     }
328                 } catch (final AsynchronousCloseException e) {
329                     // Doesn't need a user attention - ignore.
330                 } catch (final Throwable t) {
331                     buf.release();
332                     ChannelFuture future = evt.getFuture();
333                     channel.currentWriteEvent = null;
334                     channel.currentWriteBuffer = null;
335                     buf = null;
336                     evt = null;
337                     future.setFailure(t);
338                     fireExceptionCaught(channel, t);
339                 }
340             }
341             channel.inWriteNowLoop = false;
342 
343             // Initially, the following block was executed after releasing
344             // the writeLock, but there was a race condition, and it has to be
345             // executed before releasing the writeLock:
346             //
347             // https://issues.jboss.org/browse/NETTY-410
348             //
349             if (addOpWrite) {
350                 setOpWrite(channel);
351             } else if (removeOpWrite) {
352                 clearOpWrite(channel);
353             }
354         }
355 
356         Channels.fireWriteComplete(channel, writtenBytes);
357     }
358 
359 }