1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import org.jboss.netty.channel.AbstractChannelSink;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelException;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelPipeline;
26 import org.jboss.netty.channel.ChannelState;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.MessageEvent;
29
30 final class LocalServerChannelSink extends AbstractChannelSink {
31
32 LocalServerChannelSink() {
33 super();
34 }
35
36 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
37 Channel channel = e.getChannel();
38 if (channel instanceof DefaultLocalServerChannel) {
39 handleServerChannel(e);
40 } else if (channel instanceof DefaultLocalChannel) {
41 handleAcceptedChannel(e);
42 }
43 }
44
45 private static void handleServerChannel(ChannelEvent e) {
46 if (!(e instanceof ChannelStateEvent)) {
47 return;
48 }
49
50 ChannelStateEvent event = (ChannelStateEvent) e;
51 DefaultLocalServerChannel channel =
52 (DefaultLocalServerChannel) event.getChannel();
53 ChannelFuture future = event.getFuture();
54 ChannelState state = event.getState();
55 Object value = event.getValue();
56 switch (state) {
57 case OPEN:
58 if (Boolean.FALSE.equals(value)) {
59 close(channel, future);
60 }
61 break;
62 case BOUND:
63 if (value != null) {
64 bind(channel, future, (LocalAddress) value);
65 } else {
66 close(channel, future);
67 }
68 break;
69 }
70 }
71
72 private static void handleAcceptedChannel(ChannelEvent e) {
73 if (e instanceof ChannelStateEvent) {
74 ChannelStateEvent event = (ChannelStateEvent) e;
75 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
76 ChannelFuture future = event.getFuture();
77 ChannelState state = event.getState();
78 Object value = event.getValue();
79
80 switch (state) {
81 case OPEN:
82 if (Boolean.FALSE.equals(value)) {
83 channel.closeNow(future);
84 }
85 break;
86 case BOUND:
87 case CONNECTED:
88 if (value == null) {
89 channel.closeNow(future);
90 }
91 break;
92 case INTEREST_OPS:
93
94 future.setSuccess();
95 break;
96 }
97 } else if (e instanceof MessageEvent) {
98 MessageEvent event = (MessageEvent) e;
99 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
100 boolean offered = channel.writeBuffer.offer(event);
101 assert offered;
102 channel.flushWriteBuffer();
103 }
104 }
105
106 private static void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
107 try {
108 if (!LocalChannelRegistry.register(localAddress, channel)) {
109 throw new ChannelException("address already in use: " + localAddress);
110 }
111 if (!channel.bound.compareAndSet(false, true)) {
112 throw new ChannelException("already bound");
113 }
114
115 channel.localAddress = localAddress;
116 future.setSuccess();
117 fireChannelBound(channel, localAddress);
118 } catch (Throwable t) {
119 LocalChannelRegistry.unregister(localAddress);
120 future.setFailure(t);
121 fireExceptionCaught(channel, t);
122 }
123 }
124
125 private static void close(DefaultLocalServerChannel channel, ChannelFuture future) {
126 try {
127 if (channel.setClosed()) {
128 future.setSuccess();
129 LocalAddress localAddress = channel.localAddress;
130 if (channel.bound.compareAndSet(true, false)) {
131 channel.localAddress = null;
132 LocalChannelRegistry.unregister(localAddress);
133 fireChannelUnbound(channel);
134 }
135 fireChannelClosed(channel);
136 } else {
137 future.setSuccess();
138 }
139 } catch (Throwable t) {
140 future.setFailure(t);
141 fireExceptionCaught(channel, t);
142 }
143 }
144 }