View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.net.SocketTimeoutException;
23  import java.nio.channels.CancelledKeyException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.ClosedSelectorException;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.jboss.netty.channel.AbstractChannelSink;
33  import org.jboss.netty.channel.Channel;
34  import org.jboss.netty.channel.ChannelEvent;
35  import org.jboss.netty.channel.ChannelFuture;
36  import org.jboss.netty.channel.ChannelPipeline;
37  import org.jboss.netty.channel.ChannelState;
38  import org.jboss.netty.channel.ChannelStateEvent;
39  import org.jboss.netty.channel.MessageEvent;
40  import org.jboss.netty.logging.InternalLogger;
41  import org.jboss.netty.logging.InternalLoggerFactory;
42  import org.jboss.netty.util.ThreadRenamingRunnable;
43  import org.jboss.netty.util.internal.IoWorkerRunnable;
44  
45  /**
46   *
47   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
48   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
49   *
50   * @version $Rev: 2352 $, $Date: 2010-08-26 12:13:14 +0900 (Thu, 26 Aug 2010) $
51   *
52   */
53  class NioServerSocketPipelineSink extends AbstractChannelSink {
54  
55      static final InternalLogger logger =
56          InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
57      private static final AtomicInteger nextId = new AtomicInteger();
58  
59      private final int id = nextId.incrementAndGet();
60      private final NioWorker[] workers;
61      private final AtomicInteger workerIndex = new AtomicInteger();
62  
63      NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
64          workers = new NioWorker[workerCount];
65          for (int i = 0; i < workers.length; i ++) {
66              workers[i] = new NioWorker(id, i + 1, workerExecutor);
67          }
68      }
69  
70      public void eventSunk(
71              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
72          Channel channel = e.getChannel();
73          if (channel instanceof NioServerSocketChannel) {
74              handleServerSocket(e);
75          } else if (channel instanceof NioSocketChannel) {
76              handleAcceptedSocket(e);
77          }
78      }
79  
80      private void handleServerSocket(ChannelEvent e) {
81          if (!(e instanceof ChannelStateEvent)) {
82              return;
83          }
84  
85          ChannelStateEvent event = (ChannelStateEvent) e;
86          NioServerSocketChannel channel =
87              (NioServerSocketChannel) event.getChannel();
88          ChannelFuture future = event.getFuture();
89          ChannelState state = event.getState();
90          Object value = event.getValue();
91  
92          switch (state) {
93          case OPEN:
94              if (Boolean.FALSE.equals(value)) {
95                  close(channel, future);
96              }
97              break;
98          case BOUND:
99              if (value != null) {
100                 bind(channel, future, (SocketAddress) value);
101             } else {
102                 close(channel, future);
103             }
104             break;
105         }
106     }
107 
108     private void handleAcceptedSocket(ChannelEvent e) {
109         if (e instanceof ChannelStateEvent) {
110             ChannelStateEvent event = (ChannelStateEvent) e;
111             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
112             ChannelFuture future = event.getFuture();
113             ChannelState state = event.getState();
114             Object value = event.getValue();
115 
116             switch (state) {
117             case OPEN:
118                 if (Boolean.FALSE.equals(value)) {
119                     channel.worker.close(channel, future);
120                 }
121                 break;
122             case BOUND:
123             case CONNECTED:
124                 if (value == null) {
125                     channel.worker.close(channel, future);
126                 }
127                 break;
128             case INTEREST_OPS:
129                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
130                 break;
131             }
132         } else if (e instanceof MessageEvent) {
133             MessageEvent event = (MessageEvent) e;
134             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
135             boolean offered = channel.writeBuffer.offer(event);
136             assert offered;
137             channel.worker.writeFromUserCode(channel);
138         }
139     }
140 
141     private void bind(
142             NioServerSocketChannel channel, ChannelFuture future,
143             SocketAddress localAddress) {
144 
145         boolean bound = false;
146         boolean bossStarted = false;
147         try {
148             channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
149             bound = true;
150 
151             future.setSuccess();
152             fireChannelBound(channel, channel.getLocalAddress());
153 
154             Executor bossExecutor =
155                 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
156             bossExecutor.execute(
157                     new IoWorkerRunnable(
158                             new ThreadRenamingRunnable(
159                                     new Boss(channel),
160                                     "New I/O server boss #" + id +
161                                     " (" + channel + ')')));
162             bossStarted = true;
163         } catch (Throwable t) {
164             future.setFailure(t);
165             fireExceptionCaught(channel, t);
166         } finally {
167             if (!bossStarted && bound) {
168                 close(channel, future);
169             }
170         }
171     }
172 
173     private void close(NioServerSocketChannel channel, ChannelFuture future) {
174         boolean bound = channel.isBound();
175         try {
176             if (channel.socket.isOpen()) {
177                 channel.socket.close();
178                 Selector selector = channel.selector;
179                 if (selector != null) {
180                     selector.wakeup();
181                 }
182             }
183 
184             // Make sure the boss thread is not running so that that the future
185             // is notified after a new connection cannot be accepted anymore.
186             // See NETTY-256 for more information.
187             channel.shutdownLock.lock();
188             try {
189                 if (channel.setClosed()) {
190                     future.setSuccess();
191                     if (bound) {
192                         fireChannelUnbound(channel);
193                     }
194                     fireChannelClosed(channel);
195                 } else {
196                     future.setSuccess();
197                 }
198             } finally {
199                 channel.shutdownLock.unlock();
200             }
201         } catch (Throwable t) {
202             future.setFailure(t);
203             fireExceptionCaught(channel, t);
204         }
205     }
206 
207     NioWorker nextWorker() {
208         return workers[Math.abs(
209                 workerIndex.getAndIncrement() % workers.length)];
210     }
211 
212     private final class Boss implements Runnable {
213         private final Selector selector;
214         private final NioServerSocketChannel channel;
215 
216         Boss(NioServerSocketChannel channel) throws IOException {
217             this.channel = channel;
218 
219             selector = Selector.open();
220 
221             boolean registered = false;
222             try {
223                 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
224                 registered = true;
225             } finally {
226                 if (!registered) {
227                     closeSelector();
228                 }
229             }
230 
231             channel.selector = selector;
232         }
233 
234         public void run() {
235             final Thread currentThread = Thread.currentThread();
236 
237             channel.shutdownLock.lock();
238             try {
239                 for (;;) {
240                     try {
241                         if (selector.select(1000) > 0) {
242                             selector.selectedKeys().clear();
243                         }
244 
245                         SocketChannel acceptedSocket = channel.socket.accept();
246                         if (acceptedSocket != null) {
247                             registerAcceptedChannel(acceptedSocket, currentThread);
248                         }
249                     } catch (SocketTimeoutException e) {
250                         // Thrown every second to get ClosedChannelException
251                         // raised.
252                     } catch (CancelledKeyException e) {
253                         // Raised by accept() when the server socket was closed.
254                     } catch (ClosedSelectorException e) {
255                         // Raised by accept() when the server socket was closed.
256                     } catch (ClosedChannelException e) {
257                         // Closed as requested.
258                         break;
259                     } catch (Throwable e) {
260                         logger.warn(
261                                 "Failed to accept a connection.", e);
262                         try {
263                             Thread.sleep(1000);
264                         } catch (InterruptedException e1) {
265                             // Ignore
266                         }
267                     }
268                 }
269             } finally {
270                 channel.shutdownLock.unlock();
271                 closeSelector();
272             }
273         }
274 
275         private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
276             try {
277                 ChannelPipeline pipeline =
278                     channel.getConfig().getPipelineFactory().getPipeline();
279                 NioWorker worker = nextWorker();
280                 worker.register(new NioAcceptedSocketChannel(
281                         channel.getFactory(), pipeline, channel,
282                         NioServerSocketPipelineSink.this, acceptedSocket,
283                         worker, currentThread), null);
284             } catch (Exception e) {
285                 logger.warn(
286                         "Failed to initialize an accepted socket.", e);
287                 try {
288                     acceptedSocket.close();
289                 } catch (IOException e2) {
290                     logger.warn(
291                             "Failed to close a partially accepted socket.",
292                             e2);
293                 }
294             }
295         }
296 
297         private void closeSelector() {
298             channel.selector = null;
299             try {
300                 selector.close();
301             } catch (Exception e) {
302                 logger.warn("Failed to close a selector.", e);
303             }
304         }
305     }
306 }