1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.InterruptedIOException;
21 import java.net.DatagramPacket;
22 import java.net.MulticastSocket;
23 import java.net.SocketAddress;
24 import java.nio.ByteBuffer;
25
26 import org.jboss.netty.buffer.ChannelBuffer;
27 import org.jboss.netty.channel.Channel;
28 import org.jboss.netty.channel.ChannelFuture;
29 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
30
31
32
33
34
35
36
37
38
39 class OioDatagramWorker implements Runnable {
40
41 private final OioDatagramChannel channel;
42
43 OioDatagramWorker(OioDatagramChannel channel) {
44 this.channel = channel;
45 }
46
47 public void run() {
48 channel.workerThread = Thread.currentThread();
49 final MulticastSocket socket = channel.socket;
50
51 while (channel.isOpen()) {
52 synchronized (channel.interestOpsLock) {
53 while (!channel.isReadable()) {
54 try {
55
56
57 channel.interestOpsLock.wait();
58 } catch (InterruptedException e) {
59 if (!channel.isOpen()) {
60 break;
61 }
62 }
63 }
64 }
65
66 ReceiveBufferSizePredictor predictor =
67 channel.getConfig().getReceiveBufferSizePredictor();
68
69 byte[] buf = new byte[predictor.nextReceiveBufferSize()];
70 DatagramPacket packet = new DatagramPacket(buf, buf.length);
71 try {
72 socket.receive(packet);
73 } catch (InterruptedIOException e) {
74
75
76 continue;
77 } catch (Throwable t) {
78 if (!channel.socket.isClosed()) {
79 fireExceptionCaught(channel, t);
80 }
81 break;
82 }
83
84 fireMessageReceived(
85 channel,
86 channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
87 packet.getSocketAddress());
88 }
89
90
91
92 channel.workerThread = null;
93
94
95 close(channel, succeededFuture(channel));
96 }
97
98 static void write(
99 OioDatagramChannel channel, ChannelFuture future,
100 Object message, SocketAddress remoteAddress) {
101 try {
102 ChannelBuffer buf = (ChannelBuffer) message;
103 int length = buf.readableBytes();
104 ByteBuffer nioBuf = buf.toByteBuffer();
105 DatagramPacket packet;
106 if (nioBuf.hasArray()) {
107
108 packet = new DatagramPacket(
109 nioBuf.array(), nioBuf.arrayOffset(), length);
110 } else {
111
112 byte[] arrayBuf = new byte[length];
113 buf.getBytes(0, arrayBuf);
114 packet = new DatagramPacket(arrayBuf, length);
115 }
116
117 if (remoteAddress != null) {
118 packet.setSocketAddress(remoteAddress);
119 }
120 channel.socket.send(packet);
121 fireWriteComplete(channel, length);
122 future.setSuccess();
123 } catch (Throwable t) {
124 future.setFailure(t);
125 fireExceptionCaught(channel, t);
126 }
127 }
128
129 static void setInterestOps(
130 OioDatagramChannel channel, ChannelFuture future, int interestOps) {
131
132
133 interestOps &= ~Channel.OP_WRITE;
134 interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
135
136 boolean changed = false;
137 try {
138 if (channel.getInterestOps() != interestOps) {
139 if ((interestOps & Channel.OP_READ) != 0) {
140 channel.setInterestOpsNow(Channel.OP_READ);
141 } else {
142 channel.setInterestOpsNow(Channel.OP_NONE);
143 }
144 changed = true;
145 }
146
147 future.setSuccess();
148 if (changed) {
149 synchronized (channel.interestOpsLock) {
150 channel.setInterestOpsNow(interestOps);
151
152
153 Thread currentThread = Thread.currentThread();
154 Thread workerThread = channel.workerThread;
155 if (workerThread != null && currentThread != workerThread) {
156 workerThread.interrupt();
157 }
158 }
159
160 fireChannelInterestChanged(channel);
161 }
162 } catch (Throwable t) {
163 future.setFailure(t);
164 fireExceptionCaught(channel, t);
165 }
166 }
167
168 static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
169 boolean connected = channel.isConnected();
170 try {
171 channel.socket.disconnect();
172 future.setSuccess();
173 if (connected) {
174
175 Thread workerThread = channel.workerThread;
176 if (workerThread != null) {
177 try {
178 workerThread.setName(
179 "Old I/O datagram worker (" + channel + ')');
180 } catch (SecurityException e) {
181
182 }
183 }
184
185
186 fireChannelDisconnected(channel);
187 }
188 } catch (Throwable t) {
189 future.setFailure(t);
190 fireExceptionCaught(channel, t);
191 }
192 }
193
194 static void close(OioDatagramChannel channel, ChannelFuture future) {
195 boolean connected = channel.isConnected();
196 boolean bound = channel.isBound();
197 try {
198 channel.socket.close();
199 if (channel.setClosed()) {
200 future.setSuccess();
201 if (connected) {
202
203 Thread currentThread = Thread.currentThread();
204 Thread workerThread = channel.workerThread;
205 if (workerThread != null && currentThread != workerThread) {
206 workerThread.interrupt();
207 }
208 fireChannelDisconnected(channel);
209 }
210 if (bound) {
211 fireChannelUnbound(channel);
212 }
213 fireChannelClosed(channel);
214 } else {
215 future.setSuccess();
216 }
217 } catch (Throwable t) {
218 future.setFailure(t);
219 fireExceptionCaught(channel, t);
220 }
221 }
222 }