View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.ConnectException;
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.util.Iterator;
27  import java.util.Queue;
28  import java.util.Set;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.jboss.netty.channel.ChannelEvent;
36  import org.jboss.netty.channel.ChannelException;
37  import org.jboss.netty.channel.ChannelFuture;
38  import org.jboss.netty.channel.ChannelFutureListener;
39  import org.jboss.netty.channel.ChannelPipeline;
40  import org.jboss.netty.channel.ChannelState;
41  import org.jboss.netty.channel.ChannelStateEvent;
42  import org.jboss.netty.channel.MessageEvent;
43  import org.jboss.netty.logging.InternalLogger;
44  import org.jboss.netty.logging.InternalLoggerFactory;
45  import org.jboss.netty.util.ThreadRenamingRunnable;
46  import org.jboss.netty.util.internal.DeadLockProofWorker;
47  
48  class NioClientSocketPipelineSink extends AbstractNioChannelSink {
49  
50      private static final AtomicInteger nextId = new AtomicInteger();
51  
52      static final InternalLogger logger =
53          InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
54  
55      final Executor bossExecutor;
56  
57      final int id = nextId.incrementAndGet();
58      private final Boss[] bosses;
59  
60      private final AtomicInteger bossIndex = new AtomicInteger();
61  
62      private final WorkerPool<NioWorker> workerPool;
63  
64      NioClientSocketPipelineSink(
65              Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
66  
67          this.bossExecutor = bossExecutor;
68  
69          bosses = new Boss[bossCount];
70          for (int i = 0; i < bosses.length; i ++) {
71              bosses[i] = new Boss(i);
72          }
73  
74          this.workerPool = workerPool;
75      }
76  
77      public void eventSunk(
78              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
79          if (e instanceof ChannelStateEvent) {
80              ChannelStateEvent event = (ChannelStateEvent) e;
81              NioClientSocketChannel channel =
82                  (NioClientSocketChannel) event.getChannel();
83              ChannelFuture future = event.getFuture();
84              ChannelState state = event.getState();
85              Object value = event.getValue();
86  
87              switch (state) {
88              case OPEN:
89                  if (Boolean.FALSE.equals(value)) {
90                      channel.worker.close(channel, future);
91                  }
92                  break;
93              case BOUND:
94                  if (value != null) {
95                      bind(channel, future, (SocketAddress) value);
96                  } else {
97                      channel.worker.close(channel, future);
98                  }
99                  break;
100             case CONNECTED:
101                 if (value != null) {
102                     connect(channel, future, (SocketAddress) value);
103                 } else {
104                     channel.worker.close(channel, future);
105                 }
106                 break;
107             case INTEREST_OPS:
108                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
109                 break;
110             }
111         } else if (e instanceof MessageEvent) {
112             MessageEvent event = (MessageEvent) e;
113             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
114             boolean offered = channel.writeBufferQueue.offer(event);
115             assert offered;
116             channel.worker.writeFromUserCode(channel);
117         }
118     }
119 
120     private static void bind(
121             NioClientSocketChannel channel, ChannelFuture future,
122             SocketAddress localAddress) {
123         try {
124             channel.channel.socket().bind(localAddress);
125             channel.boundManually = true;
126             channel.setBound();
127             future.setSuccess();
128             fireChannelBound(channel, channel.getLocalAddress());
129         } catch (Throwable t) {
130             future.setFailure(t);
131             fireExceptionCaught(channel, t);
132         }
133     }
134 
135     private void connect(
136             final NioClientSocketChannel channel, final ChannelFuture cf,
137             SocketAddress remoteAddress) {
138         try {
139             if (channel.channel.connect(remoteAddress)) {
140                 channel.worker.register(channel, cf);
141             } else {
142                 channel.getCloseFuture().addListener(new ChannelFutureListener() {
143                     public void operationComplete(ChannelFuture f)
144                             throws Exception {
145                         if (!cf.isDone()) {
146                             cf.setFailure(new ClosedChannelException());
147                         }
148                     }
149                 });
150                 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
151                 channel.connectFuture = cf;
152                 nextBoss().register(channel);
153             }
154 
155         } catch (Throwable t) {
156             cf.setFailure(t);
157             fireExceptionCaught(channel, t);
158             channel.worker.close(channel, succeededFuture(channel));
159         }
160     }
161 
162     Boss nextBoss() {
163         return bosses[Math.abs(
164                 bossIndex.getAndIncrement() % bosses.length)];
165     }
166 
167     NioWorker nextWorker() {
168         return workerPool.nextWorker();
169     }
170 
171     private final class Boss implements Runnable {
172 
173         volatile Selector selector;
174         private boolean started;
175         private final AtomicBoolean wakenUp = new AtomicBoolean();
176         private final Object startStopLock = new Object();
177         private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
178         private final int subId;
179 
180         Boss(int subId) {
181             this.subId = subId;
182         }
183 
184         void register(NioClientSocketChannel channel) {
185             Runnable registerTask = new RegisterTask(this, channel);
186             Selector selector;
187 
188             synchronized (startStopLock) {
189                 if (!started) {
190                     // Open a selector if this worker didn't start yet.
191                     try {
192                         this.selector = selector =  Selector.open();
193                     } catch (Throwable t) {
194                         throw new ChannelException(
195                                 "Failed to create a selector.", t);
196                     }
197 
198                     // Start the worker thread with the new Selector.
199                     boolean success = false;
200                     try {
201                         DeadLockProofWorker.start(bossExecutor,
202                                 new ThreadRenamingRunnable(this,
203                                         "New I/O client boss #" + id + '-' + subId));
204 
205                         success = true;
206                     } finally {
207                         if (!success) {
208                             // Release the Selector if the execution fails.
209                             try {
210                                 selector.close();
211                             } catch (Throwable t) {
212                                 if (logger.isWarnEnabled()) {
213                                     logger.warn("Failed to close a selector.", t);
214                                 }
215                             }
216                             this.selector = selector = null;
217                             // The method will return to the caller at this point.
218                         }
219                     }
220                 } else {
221                     // Use the existing selector if this worker has been started.
222                     selector = this.selector;
223                 }
224 
225                 assert selector != null && selector.isOpen();
226 
227                 started = true;
228                 boolean offered = registerTaskQueue.offer(registerTask);
229                 assert offered;
230             }
231 
232             if (wakenUp.compareAndSet(false, true)) {
233                 selector.wakeup();
234             }
235         }
236 
237         public void run() {
238             boolean shutdown = false;
239             Selector selector = this.selector;
240             long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
241             for (;;) {
242                 wakenUp.set(false);
243 
244                 try {
245                     int selectedKeyCount = selector.select(10);
246 
247                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
248                     // before calling 'selector.wakeup()' to reduce the wake-up
249                     // overhead. (Selector.wakeup() is an expensive operation.)
250                     //
251                     // However, there is a race condition in this approach.
252                     // The race condition is triggered when 'wakenUp' is set to
253                     // true too early.
254                     //
255                     // 'wakenUp' is set to true too early if:
256                     // 1) Selector is waken up between 'wakenUp.set(false)' and
257                     //    'selector.select(...)'. (BAD)
258                     // 2) Selector is waken up between 'selector.select(...)' and
259                     //    'if (wakenUp.get()) { ... }'. (OK)
260                     //
261                     // In the first case, 'wakenUp' is set to true and the
262                     // following 'selector.select(...)' will wake up immediately.
263                     // Until 'wakenUp' is set to false again in the next round,
264                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
265                     // any attempt to wake up the Selector will fail, too, causing
266                     // the following 'selector.select(...)' call to block
267                     // unnecessarily.
268                     //
269                     // To fix this problem, we wake up the selector again if wakenUp
270                     // is true immediately after selector.select(...).
271                     // It is inefficient in that it wakes up the selector for both
272                     // the first case (BAD - wake-up required) and the second case
273                     // (OK - no wake-up required).
274 
275                     if (wakenUp.get()) {
276                         selector.wakeup();
277                     }
278 
279                     processRegisterTaskQueue();
280 
281                     if (selectedKeyCount > 0) {
282                         processSelectedKeys(selector.selectedKeys());
283                     }
284 
285                     // Handle connection timeout every 10 milliseconds approximately.
286                     long currentTimeNanos = System.nanoTime();
287                     if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) {
288                         lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
289                         processConnectTimeout(selector.keys(), currentTimeNanos);
290                     }
291 
292                     // Exit the loop when there's nothing to handle.
293                     // The shutdown flag is used to delay the shutdown of this
294                     // loop to avoid excessive Selector creation when
295                     // connection attempts are made in a one-by-one manner
296                     // instead of concurrent manner.
297                     if (selector.keys().isEmpty()) {
298                         if (shutdown ||
299                             bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
300 
301                             synchronized (startStopLock) {
302                                 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
303                                     started = false;
304                                     try {
305                                         selector.close();
306                                     } catch (IOException e) {
307                                         if (logger.isWarnEnabled()) {
308                                             logger.warn(
309                                                     "Failed to close a selector.", e);
310                                         }
311 
312                                     } finally {
313                                         this.selector = null;
314                                     }
315                                     break;
316                                 } else {
317                                     shutdown = false;
318                                 }
319                             }
320                         } else {
321                             // Give one more second.
322                             shutdown = true;
323                         }
324                     } else {
325                         shutdown = false;
326                     }
327                 } catch (Throwable t) {
328                     if (logger.isWarnEnabled()) {
329                         logger.warn(
330                                 "Unexpected exception in the selector loop.", t);
331                     }
332 
333 
334                     // Prevent possible consecutive immediate failures.
335                     try {
336                         Thread.sleep(1000);
337                     } catch (InterruptedException e) {
338                         // Ignore.
339                     }
340                 }
341             }
342         }
343 
344         private void processRegisterTaskQueue() {
345             for (;;) {
346                 final Runnable task = registerTaskQueue.poll();
347                 if (task == null) {
348                     break;
349                 }
350 
351                 task.run();
352             }
353         }
354 
355         private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
356             for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
357                 SelectionKey k = i.next();
358                 i.remove();
359 
360                 if (!k.isValid()) {
361                     close(k);
362                     continue;
363                 }
364 
365                 if (k.isConnectable()) {
366                     connect(k);
367                 }
368             }
369         }
370 
371         private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
372             ConnectException cause = null;
373             for (SelectionKey k: keys) {
374                 if (!k.isValid()) {
375                     // Comment the close call again as it gave us major problems
376                     // with ClosedChannelExceptions.
377                     //
378                     // See:
379                     // * https://github.com/netty/netty/issues/142
380                     // * https://github.com/netty/netty/issues/138
381                     //
382                     // close(k);
383                     continue;
384                 }
385 
386                 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
387                 if (ch.connectDeadlineNanos > 0 &&
388                         currentTimeNanos >= ch.connectDeadlineNanos) {
389 
390                     if (cause == null) {
391                         cause = new ConnectException("connection timed out");
392                     }
393 
394                     ch.connectFuture.setFailure(cause);
395                     fireExceptionCaught(ch, cause);
396                     ch.worker.close(ch, succeededFuture(ch));
397                 }
398             }
399         }
400 
401         private void connect(SelectionKey k) {
402             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
403             try {
404                 if (ch.channel.finishConnect()) {
405                     k.cancel();
406                     ch.worker.register(ch, ch.connectFuture);
407                 }
408             } catch (Throwable t) {
409                 ch.connectFuture.setFailure(t);
410                 fireExceptionCaught(ch, t);
411                 k.cancel(); // Some JDK implementations run into an infinite loop without this.
412                 ch.worker.close(ch, succeededFuture(ch));
413             }
414         }
415 
416         private void close(SelectionKey k) {
417             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
418             ch.worker.close(ch, succeededFuture(ch));
419         }
420     }
421 
422     private static final class RegisterTask implements Runnable {
423         private final Boss boss;
424         private final NioClientSocketChannel channel;
425 
426         RegisterTask(Boss boss, NioClientSocketChannel channel) {
427             this.boss = boss;
428             this.channel = channel;
429         }
430 
431         public void run() {
432             try {
433                 channel.channel.register(
434                         boss.selector, SelectionKey.OP_CONNECT, channel);
435             } catch (ClosedChannelException e) {
436                 channel.worker.close(channel, succeededFuture(channel));
437             }
438 
439             int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
440             if (connectTimeout > 0) {
441                 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
442             }
443         }
444     }
445 }