View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.net.SocketTimeoutException;
23  import java.nio.channels.CancelledKeyException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.ClosedSelectorException;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.jboss.netty.channel.Channel;
33  import org.jboss.netty.channel.ChannelEvent;
34  import org.jboss.netty.channel.ChannelFuture;
35  import org.jboss.netty.channel.ChannelPipeline;
36  import org.jboss.netty.channel.ChannelState;
37  import org.jboss.netty.channel.ChannelStateEvent;
38  import org.jboss.netty.channel.MessageEvent;
39  import org.jboss.netty.logging.InternalLogger;
40  import org.jboss.netty.logging.InternalLoggerFactory;
41  import org.jboss.netty.util.ThreadRenamingRunnable;
42  import org.jboss.netty.util.internal.DeadLockProofWorker;
43  
44  class NioServerSocketPipelineSink extends AbstractNioChannelSink {
45  
46      private static final AtomicInteger nextId = new AtomicInteger();
47  
48      static final InternalLogger logger =
49          InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
50  
51      final int id = nextId.incrementAndGet();
52  
53      private final WorkerPool<NioWorker> workerPool;
54  
55      NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
56          this.workerPool = workerPool;
57      }
58  
59  
60      public void eventSunk(
61              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
62          Channel channel = e.getChannel();
63          if (channel instanceof NioServerSocketChannel) {
64              handleServerSocket(e);
65          } else if (channel instanceof NioSocketChannel) {
66              handleAcceptedSocket(e);
67          }
68      }
69  
70      private void handleServerSocket(ChannelEvent e) {
71          if (!(e instanceof ChannelStateEvent)) {
72              return;
73          }
74  
75          ChannelStateEvent event = (ChannelStateEvent) e;
76          NioServerSocketChannel channel =
77              (NioServerSocketChannel) event.getChannel();
78          ChannelFuture future = event.getFuture();
79          ChannelState state = event.getState();
80          Object value = event.getValue();
81  
82          switch (state) {
83          case OPEN:
84              if (Boolean.FALSE.equals(value)) {
85                  close(channel, future);
86              }
87              break;
88          case BOUND:
89              if (value != null) {
90                  bind(channel, future, (SocketAddress) value);
91              } else {
92                  close(channel, future);
93              }
94              break;
95          default:
96              break;
97          }
98      }
99  
100     private static void handleAcceptedSocket(ChannelEvent e) {
101         if (e instanceof ChannelStateEvent) {
102             ChannelStateEvent event = (ChannelStateEvent) e;
103             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
104             ChannelFuture future = event.getFuture();
105             ChannelState state = event.getState();
106             Object value = event.getValue();
107 
108             switch (state) {
109             case OPEN:
110                 if (Boolean.FALSE.equals(value)) {
111                     channel.worker.close(channel, future);
112                 }
113                 break;
114             case BOUND:
115             case CONNECTED:
116                 if (value == null) {
117                     channel.worker.close(channel, future);
118                 }
119                 break;
120             case INTEREST_OPS:
121                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
122                 break;
123             }
124         } else if (e instanceof MessageEvent) {
125             MessageEvent event = (MessageEvent) e;
126             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
127             boolean offered = channel.writeBufferQueue.offer(event);
128             assert offered;
129             channel.worker.writeFromUserCode(channel);
130         }
131     }
132 
133     private void bind(
134             NioServerSocketChannel channel, ChannelFuture future,
135             SocketAddress localAddress) {
136 
137         boolean bound = false;
138         boolean bossStarted = false;
139         try {
140             channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
141             bound = true;
142 
143             future.setSuccess();
144             fireChannelBound(channel, channel.getLocalAddress());
145 
146             Executor bossExecutor =
147                 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
148             DeadLockProofWorker.start(bossExecutor,
149                     new ThreadRenamingRunnable(new Boss(channel),
150                             "New I/O server boss #" + id + " (" + channel + ')'));
151             bossStarted = true;
152         } catch (Throwable t) {
153             future.setFailure(t);
154             fireExceptionCaught(channel, t);
155         } finally {
156             if (!bossStarted && bound) {
157                 close(channel, future);
158             }
159         }
160     }
161 
162     private static void close(NioServerSocketChannel channel, ChannelFuture future) {
163         boolean bound = channel.isBound();
164         try {
165             if (channel.socket.isOpen()) {
166                 channel.socket.close();
167                 Selector selector = channel.selector;
168                 if (selector != null) {
169                     selector.wakeup();
170                 }
171             }
172 
173             // Make sure the boss thread is not running so that that the future
174             // is notified after a new connection cannot be accepted anymore.
175             // See NETTY-256 for more information.
176             channel.shutdownLock.lock();
177             try {
178                 if (channel.setClosed()) {
179                     future.setSuccess();
180                     if (bound) {
181                         fireChannelUnbound(channel);
182                     }
183                     fireChannelClosed(channel);
184                 } else {
185                     future.setSuccess();
186                 }
187             } finally {
188                 channel.shutdownLock.unlock();
189             }
190         } catch (Throwable t) {
191             future.setFailure(t);
192             fireExceptionCaught(channel, t);
193         }
194     }
195 
196     NioWorker nextWorker() {
197         return workerPool.nextWorker();
198     }
199 
200     private final class Boss implements Runnable {
201         private final Selector selector;
202         private final NioServerSocketChannel channel;
203 
204         Boss(NioServerSocketChannel channel) throws IOException {
205             this.channel = channel;
206 
207             selector = Selector.open();
208 
209             boolean registered = false;
210             try {
211                 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
212                 registered = true;
213             } finally {
214                 if (!registered) {
215                     closeSelector();
216                 }
217             }
218 
219             channel.selector = selector;
220         }
221 
222         public void run() {
223             final Thread currentThread = Thread.currentThread();
224 
225             channel.shutdownLock.lock();
226             try {
227                 for (;;) {
228                     try {
229                         if (selector.select(1000) > 0) {
230                             // There was something selected if we reach this point, so clear
231                             // the selected keys
232                             selector.selectedKeys().clear();
233                         }
234 
235                         // accept connections in a for loop until no new connection is ready
236                         for (;;) {
237                             SocketChannel acceptedSocket = channel.socket.accept();
238                             if (acceptedSocket == null) {
239                                 break;
240                             }
241                             registerAcceptedChannel(acceptedSocket, currentThread);
242 
243                         }
244 
245                     } catch (SocketTimeoutException e) {
246                         // Thrown every second to get ClosedChannelException
247                         // raised.
248                     } catch (CancelledKeyException e) {
249                         // Raised by accept() when the server socket was closed.
250                     } catch (ClosedSelectorException e) {
251                         // Raised by accept() when the server socket was closed.
252                     } catch (ClosedChannelException e) {
253                         // Closed as requested.
254                         break;
255                     } catch (Throwable e) {
256                         if (logger.isWarnEnabled()) {
257                             logger.warn(
258                                     "Failed to accept a connection.", e);
259                         }
260 
261                         try {
262                             Thread.sleep(1000);
263                         } catch (InterruptedException e1) {
264                             // Ignore
265                         }
266                     }
267                 }
268             } finally {
269                 channel.shutdownLock.unlock();
270                 closeSelector();
271             }
272         }
273 
274         private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
275             try {
276                 ChannelPipeline pipeline =
277                     channel.getConfig().getPipelineFactory().getPipeline();
278                 NioWorker worker = nextWorker();
279                 worker.register(new NioAcceptedSocketChannel(
280                         channel.getFactory(), pipeline, channel,
281                         NioServerSocketPipelineSink.this, acceptedSocket,
282                         worker, currentThread), null);
283             } catch (Exception e) {
284                 if (logger.isWarnEnabled()) {
285                     logger.warn(
286                             "Failed to initialize an accepted socket.", e);
287                 }
288 
289                 try {
290                     acceptedSocket.close();
291                 } catch (IOException e2) {
292                     if (logger.isWarnEnabled()) {
293                         logger.warn(
294                                 "Failed to close a partially accepted socket.",
295                                 e2);
296                     }
297 
298                 }
299             }
300         }
301 
302         private void closeSelector() {
303             channel.selector = null;
304             try {
305                 selector.close();
306             } catch (Exception e) {
307                 if (logger.isWarnEnabled()) {
308                     logger.warn("Failed to close a selector.", e);
309                 }
310             }
311         }
312     }
313 }