View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.ConnectException;
21  
22  import org.jboss.netty.channel.AbstractChannelSink;
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelEvent;
25  import org.jboss.netty.channel.ChannelException;
26  import org.jboss.netty.channel.ChannelFuture;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelState;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  import org.jboss.netty.logging.InternalLogger;
32  import org.jboss.netty.logging.InternalLoggerFactory;
33  
34  /**
35   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
36   * @author Andy Taylor (andy.taylor@jboss.org)
37   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
38   * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
39   */
40  final class LocalClientChannelSink extends AbstractChannelSink {
41  
42      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
43  
44      LocalClientChannelSink() {
45          super();
46      }
47  
48      public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
49          if (e instanceof ChannelStateEvent) {
50              ChannelStateEvent event = (ChannelStateEvent) e;
51  
52              DefaultLocalChannel channel =
53                    (DefaultLocalChannel) event.getChannel();
54              ChannelFuture future = event.getFuture();
55              ChannelState state = event.getState();
56              Object value = event.getValue();
57              switch (state) {
58              case OPEN:
59                  if (Boolean.FALSE.equals(value)) {
60                      channel.closeNow(future);
61                  }
62                  break;
63              case BOUND:
64                  if (value != null) {
65                      bind(channel, future, (LocalAddress) value);
66                  } else {
67                      channel.closeNow(future);
68                  }
69                  break;
70              case CONNECTED:
71                  if (value != null) {
72                      connect(channel, future, (LocalAddress) value);
73                  } else {
74                      channel.closeNow(future);
75                  }
76                  break;
77              case INTEREST_OPS:
78                  // Unsupported - discard silently.
79                  future.setSuccess();
80                  break;
81              }
82          }
83          else if (e instanceof MessageEvent) {
84              MessageEvent event = (MessageEvent) e;
85              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
86              boolean offered = channel.writeBuffer.offer(event);
87              assert offered;
88              channel.flushWriteBuffer();
89          }
90      }
91  
92      private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
93          try {
94              if (!LocalChannelRegistry.register(localAddress, channel)) {
95                  throw new ChannelException("address already in use: " + localAddress);
96              }
97  
98              if (!channel.bound.compareAndSet(false, true)) {
99                  throw new ChannelException("already bound");
100             }
101 
102             channel.localAddress = localAddress;
103             future.setSuccess();
104             fireChannelBound(channel, localAddress);
105         } catch (Throwable t) {
106             LocalChannelRegistry.unregister(localAddress);
107             future.setFailure(t);
108             fireExceptionCaught(channel, t);
109         }
110     }
111 
112     private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
113         Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
114         if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
115             future.setFailure(new ConnectException("connection refused"));
116             return;
117         }
118 
119         DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
120         ChannelPipeline pipeline;
121         try {
122             pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
123         } catch (Exception e) {
124             future.setFailure(e);
125             fireExceptionCaught(channel, e);
126             logger.warn(
127                     "Failed to initialize an accepted socket.", e);
128             return;
129         }
130 
131         future.setSuccess();
132         DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
133                 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
134         channel.pairedChannel = acceptedChannel;
135 
136         bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
137         channel.remoteAddress = serverChannel.getLocalAddress();
138         fireChannelConnected(channel, serverChannel.getLocalAddress());
139 
140         acceptedChannel.localAddress = serverChannel.getLocalAddress();
141         acceptedChannel.bound.set(true);
142         fireChannelBound(acceptedChannel, channel.getRemoteAddress());
143         acceptedChannel.remoteAddress = channel.getLocalAddress();
144         fireChannelConnected(acceptedChannel, channel.getLocalAddress());
145 
146         // Flush something that was written in channelBound / channelConnected
147         channel.flushWriteBuffer();
148         acceptedChannel.flushWriteBuffer();
149     }
150 }