View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.InterruptedIOException;
21  import java.net.DatagramPacket;
22  import java.net.MulticastSocket;
23  import java.net.SocketAddress;
24  import java.nio.ByteBuffer;
25  
26  import org.jboss.netty.buffer.ChannelBuffer;
27  import org.jboss.netty.channel.Channel;
28  import org.jboss.netty.channel.ChannelFuture;
29  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
30  
31  /**
32   *
33   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
34   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
35   *
36   * @version $Rev: 2341 $, $Date: 2010-07-07 13:44:23 +0900 (Wed, 07 Jul 2010) $
37   *
38   */
39  class OioDatagramWorker implements Runnable {
40  
41      private final OioDatagramChannel channel;
42  
43      OioDatagramWorker(OioDatagramChannel channel) {
44          this.channel = channel;
45      }
46  
47      public void run() {
48          channel.workerThread = Thread.currentThread();
49          final MulticastSocket socket = channel.socket;
50  
51          while (channel.isOpen()) {
52              synchronized (channel.interestOpsLock) {
53                  while (!channel.isReadable()) {
54                      try {
55                          // notify() is not called at all.
56                          // close() and setInterestOps() calls Thread.interrupt()
57                          channel.interestOpsLock.wait();
58                      } catch (InterruptedException e) {
59                          if (!channel.isOpen()) {
60                              break;
61                          }
62                      }
63                  }
64              }
65  
66              ReceiveBufferSizePredictor predictor =
67                  channel.getConfig().getReceiveBufferSizePredictor();
68  
69              byte[] buf = new byte[predictor.nextReceiveBufferSize()];
70              DatagramPacket packet = new DatagramPacket(buf, buf.length);
71              try {
72                  socket.receive(packet);
73              } catch (InterruptedIOException e) {
74                  // Can happen on interruption.
75                  // Keep receiving unless the channel is closed.
76                  continue;
77              } catch (Throwable t) {
78                  if (!channel.socket.isClosed()) {
79                      fireExceptionCaught(channel, t);
80                  }
81                  break;
82              }
83  
84              fireMessageReceived(
85                      channel,
86                      channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
87                      packet.getSocketAddress());
88          }
89  
90          // Setting the workerThread to null will prevent any channel
91          // operations from interrupting this thread from now on.
92          channel.workerThread = null;
93  
94          // Clean up.
95          close(channel, succeededFuture(channel));
96      }
97  
98      static void write(
99              OioDatagramChannel channel, ChannelFuture future,
100             Object message, SocketAddress remoteAddress) {
101         try {
102             ChannelBuffer buf = (ChannelBuffer) message;
103             int length = buf.readableBytes();
104             ByteBuffer nioBuf = buf.toByteBuffer();
105             DatagramPacket packet;
106             if (nioBuf.hasArray()) {
107                 // Avoid copy if the buffer is backed by an array.
108                 packet = new DatagramPacket(
109                         nioBuf.array(), nioBuf.arrayOffset(), length);
110             } else {
111                 // Otherwise it will be expensive.
112                 byte[] arrayBuf = new byte[length];
113                 buf.getBytes(0, arrayBuf);
114                 packet = new DatagramPacket(arrayBuf, length);
115             }
116 
117             if (remoteAddress != null) {
118                 packet.setSocketAddress(remoteAddress);
119             }
120             channel.socket.send(packet);
121             fireWriteComplete(channel, length);
122             future.setSuccess();
123         } catch (Throwable t) {
124             future.setFailure(t);
125             fireExceptionCaught(channel, t);
126         }
127     }
128 
129     static void setInterestOps(
130             OioDatagramChannel channel, ChannelFuture future, int interestOps) {
131 
132         // Override OP_WRITE flag - a user cannot change this flag.
133         interestOps &= ~Channel.OP_WRITE;
134         interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
135 
136         boolean changed = false;
137         try {
138             if (channel.getInterestOps() != interestOps) {
139                 if ((interestOps & Channel.OP_READ) != 0) {
140                     channel.setInterestOpsNow(Channel.OP_READ);
141                 } else {
142                     channel.setInterestOpsNow(Channel.OP_NONE);
143                 }
144                 changed = true;
145             }
146 
147             future.setSuccess();
148             if (changed) {
149                 synchronized (channel.interestOpsLock) {
150                     channel.setInterestOpsNow(interestOps);
151 
152                     // Notify the worker so it stops or continues reading.
153                     Thread currentThread = Thread.currentThread();
154                     Thread workerThread = channel.workerThread;
155                     if (workerThread != null && currentThread != workerThread) {
156                         workerThread.interrupt();
157                     }
158                 }
159 
160                 fireChannelInterestChanged(channel);
161             }
162         } catch (Throwable t) {
163             future.setFailure(t);
164             fireExceptionCaught(channel, t);
165         }
166     }
167 
168     static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
169         boolean connected = channel.isConnected();
170         try {
171             channel.socket.disconnect();
172             future.setSuccess();
173             if (connected) {
174                 // Update the worker's thread name to reflect the state change.
175                 Thread workerThread = channel.workerThread;
176                 if (workerThread != null) {
177                     try {
178                         workerThread.setName(
179                                 "Old I/O datagram worker (" + channel + ')');
180                     } catch (SecurityException e) {
181                         // Ignore.
182                     }
183                 }
184 
185                 // Notify.
186                 fireChannelDisconnected(channel);
187             }
188         } catch (Throwable t) {
189             future.setFailure(t);
190             fireExceptionCaught(channel, t);
191         }
192     }
193 
194     static void close(OioDatagramChannel channel, ChannelFuture future) {
195         boolean connected = channel.isConnected();
196         boolean bound = channel.isBound();
197         try {
198             channel.socket.close();
199             if (channel.setClosed()) {
200                 future.setSuccess();
201                 if (connected) {
202                     // Notify the worker so it stops reading.
203                     Thread currentThread = Thread.currentThread();
204                     Thread workerThread = channel.workerThread;
205                     if (workerThread != null && currentThread != workerThread) {
206                         workerThread.interrupt();
207                     }
208                     fireChannelDisconnected(channel);
209                 }
210                 if (bound) {
211                     fireChannelUnbound(channel);
212                 }
213                 fireChannelClosed(channel);
214             } else {
215                 future.setSuccess();
216             }
217         } catch (Throwable t) {
218             future.setFailure(t);
219             fireExceptionCaught(channel, t);
220         }
221     }
222 }