View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.OutputStream;
21  import java.io.PushbackInputStream;
22  import java.net.SocketException;
23  import java.nio.channels.ClosedChannelException;
24  import java.util.regex.Pattern;
25  
26  import org.jboss.netty.buffer.ChannelBuffer;
27  import org.jboss.netty.channel.Channel;
28  import org.jboss.netty.channel.ChannelFuture;
29  
30  /**
31   *
32   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
33   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
34   *
35   * @version $Rev: 2307 $, $Date: 2010-06-16 12:33:29 +0900 (Wed, 16 Jun 2010) $
36   *
37   */
38  class OioWorker implements Runnable {
39  
40      private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
41              "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
42  
43      private final OioSocketChannel channel;
44  
45      OioWorker(OioSocketChannel channel) {
46          this.channel = channel;
47      }
48  
49      public void run() {
50          channel.workerThread = Thread.currentThread();
51          final PushbackInputStream in = channel.getInputStream();
52  
53          while (channel.isOpen()) {
54              synchronized (channel.interestOpsLock) {
55                  while (!channel.isReadable()) {
56                      try {
57                          // notify() is not called at all.
58                          // close() and setInterestOps() calls Thread.interrupt()
59                          channel.interestOpsLock.wait();
60                      } catch (InterruptedException e) {
61                          if (!channel.isOpen()) {
62                              break;
63                          }
64                      }
65                  }
66              }
67  
68              byte[] buf;
69              int readBytes;
70              try {
71                  int bytesToRead = in.available();
72                  if (bytesToRead > 0) {
73                      buf = new byte[bytesToRead];
74                      readBytes = in.read(buf);
75                  } else {
76                      int b = in.read();
77                      if (b < 0) {
78                          break;
79                      }
80                      in.unread(b);
81                      continue;
82                  }
83              } catch (Throwable t) {
84                  if (!channel.socket.isClosed()) {
85                      fireExceptionCaught(channel, t);
86                  }
87                  break;
88              }
89  
90              fireMessageReceived(
91                      channel,
92                      channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
93          }
94  
95          // Setting the workerThread to null will prevent any channel
96          // operations from interrupting this thread from now on.
97          channel.workerThread = null;
98  
99          // Clean up.
100         close(channel, succeededFuture(channel));
101     }
102 
103     static void write(
104             OioSocketChannel channel, ChannelFuture future,
105             Object message) {
106 
107         OutputStream out = channel.getOutputStream();
108         if (out == null) {
109             Exception e = new ClosedChannelException();
110             future.setFailure(e);
111             fireExceptionCaught(channel, e);
112             return;
113         }
114 
115         try {
116             ChannelBuffer a = (ChannelBuffer) message;
117             int length = a.readableBytes();
118             synchronized (out) {
119                 a.getBytes(a.readerIndex(), out, length);
120             }
121             fireWriteComplete(channel, length);
122             future.setSuccess();
123         } catch (Throwable t) {
124             // Convert 'SocketException: Socket closed' to
125             // ClosedChannelException.
126             if (t instanceof SocketException &&
127                     SOCKET_CLOSED_MESSAGE.matcher(
128                             String.valueOf(t.getMessage())).matches()) {
129                 t = new ClosedChannelException();
130             }
131             future.setFailure(t);
132             fireExceptionCaught(channel, t);
133         }
134     }
135 
136     static void setInterestOps(
137             OioSocketChannel channel, ChannelFuture future, int interestOps) {
138 
139         // Override OP_WRITE flag - a user cannot change this flag.
140         interestOps &= ~Channel.OP_WRITE;
141         interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
142 
143         boolean changed = false;
144         try {
145             if (channel.getInterestOps() != interestOps) {
146                 if ((interestOps & Channel.OP_READ) != 0) {
147                     channel.setInterestOpsNow(Channel.OP_READ);
148                 } else {
149                     channel.setInterestOpsNow(Channel.OP_NONE);
150                 }
151                 changed = true;
152             }
153 
154             future.setSuccess();
155             if (changed) {
156                 synchronized (channel.interestOpsLock) {
157                     channel.setInterestOpsNow(interestOps);
158 
159                     // Notify the worker so it stops or continues reading.
160                     Thread currentThread = Thread.currentThread();
161                     Thread workerThread = channel.workerThread;
162                     if (workerThread != null && currentThread != workerThread) {
163                         workerThread.interrupt();
164                     }
165                 }
166 
167                 fireChannelInterestChanged(channel);
168             }
169         } catch (Throwable t) {
170             future.setFailure(t);
171             fireExceptionCaught(channel, t);
172         }
173     }
174 
175     static void close(OioSocketChannel channel, ChannelFuture future) {
176         boolean connected = channel.isConnected();
177         boolean bound = channel.isBound();
178         try {
179             channel.socket.close();
180             if (channel.setClosed()) {
181                 future.setSuccess();
182                 if (connected) {
183                     // Notify the worker so it stops reading.
184                     Thread currentThread = Thread.currentThread();
185                     Thread workerThread = channel.workerThread;
186                     if (workerThread != null && currentThread != workerThread) {
187                         workerThread.interrupt();
188                     }
189                     fireChannelDisconnected(channel);
190                 }
191                 if (bound) {
192                     fireChannelUnbound(channel);
193                 }
194                 fireChannelClosed(channel);
195             } else {
196                 future.setSuccess();
197             }
198         } catch (Throwable t) {
199             future.setFailure(t);
200             fireExceptionCaught(channel, t);
201         }
202     }
203 }