View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import org.jboss.netty.channel.AbstractChannelSink;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelEvent;
23  import org.jboss.netty.channel.ChannelException;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelPipeline;
26  import org.jboss.netty.channel.ChannelState;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.MessageEvent;
29  
30  final class LocalServerChannelSink extends AbstractChannelSink {
31  
32      LocalServerChannelSink() {
33          super();
34      }
35  
36      public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
37          Channel channel = e.getChannel();
38          if (channel instanceof DefaultLocalServerChannel) {
39              handleServerChannel(e);
40          } else if (channel instanceof DefaultLocalChannel) {
41              handleAcceptedChannel(e);
42          }
43      }
44  
45      private static void handleServerChannel(ChannelEvent e) {
46          if (!(e instanceof ChannelStateEvent)) {
47              return;
48          }
49  
50          ChannelStateEvent event = (ChannelStateEvent) e;
51          DefaultLocalServerChannel channel =
52                (DefaultLocalServerChannel) event.getChannel();
53          ChannelFuture future = event.getFuture();
54          ChannelState state = event.getState();
55          Object value = event.getValue();
56          switch (state) {
57          case OPEN:
58              if (Boolean.FALSE.equals(value)) {
59                  close(channel, future);
60              }
61              break;
62          case BOUND:
63              if (value != null) {
64                  bind(channel, future, (LocalAddress) value);
65              } else {
66                  close(channel, future);
67              }
68              break;
69          }
70      }
71  
72      private static void handleAcceptedChannel(ChannelEvent e) {
73          if (e instanceof ChannelStateEvent) {
74              ChannelStateEvent event = (ChannelStateEvent) e;
75              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
76              ChannelFuture future = event.getFuture();
77              ChannelState state = event.getState();
78              Object value = event.getValue();
79  
80              switch (state) {
81              case OPEN:
82                  if (Boolean.FALSE.equals(value)) {
83                      channel.closeNow(future);
84                  }
85                  break;
86              case BOUND:
87              case CONNECTED:
88                  if (value == null) {
89                      channel.closeNow(future);
90                  }
91                  break;
92              case INTEREST_OPS:
93                  // Unsupported - discard silently.
94                  future.setSuccess();
95                  break;
96              }
97          } else if (e instanceof MessageEvent) {
98              MessageEvent event = (MessageEvent) e;
99              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
100             boolean offered = channel.writeBuffer.offer(event);
101             assert offered;
102             channel.flushWriteBuffer();
103         }
104     }
105 
106     private static void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
107         try {
108             if (!LocalChannelRegistry.register(localAddress, channel)) {
109                 throw new ChannelException("address already in use: " + localAddress);
110             }
111             if (!channel.bound.compareAndSet(false, true)) {
112                 throw new ChannelException("already bound");
113             }
114 
115             channel.localAddress = localAddress;
116             future.setSuccess();
117             fireChannelBound(channel, localAddress);
118         } catch (Throwable t) {
119             LocalChannelRegistry.unregister(localAddress);
120             future.setFailure(t);
121             fireExceptionCaught(channel, t);
122         }
123     }
124 
125     private static void close(DefaultLocalServerChannel channel, ChannelFuture future) {
126         try {
127             if (channel.setClosed()) {
128                 future.setSuccess();
129                 LocalAddress localAddress = channel.localAddress;
130                 if (channel.bound.compareAndSet(true, false)) {
131                     channel.localAddress = null;
132                     LocalChannelRegistry.unregister(localAddress);
133                     fireChannelUnbound(channel);
134                 }
135                 fireChannelClosed(channel);
136             } else {
137                 future.setSuccess();
138             }
139         } catch (Throwable t) {
140             future.setFailure(t);
141             fireExceptionCaught(channel, t);
142         }
143     }
144 }