1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.ConnectException;
21
22 import org.jboss.netty.channel.AbstractChannelSink;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelEvent;
25 import org.jboss.netty.channel.ChannelException;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelState;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.MessageEvent;
31 import org.jboss.netty.logging.InternalLogger;
32 import org.jboss.netty.logging.InternalLoggerFactory;
33
34
35
36
37
38
39
40 final class LocalClientChannelSink extends AbstractChannelSink {
41
42 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
43
44 LocalClientChannelSink() {
45 super();
46 }
47
48 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
49 if (e instanceof ChannelStateEvent) {
50 ChannelStateEvent event = (ChannelStateEvent) e;
51
52 DefaultLocalChannel channel =
53 (DefaultLocalChannel) event.getChannel();
54 ChannelFuture future = event.getFuture();
55 ChannelState state = event.getState();
56 Object value = event.getValue();
57 switch (state) {
58 case OPEN:
59 if (Boolean.FALSE.equals(value)) {
60 channel.closeNow(future);
61 }
62 break;
63 case BOUND:
64 if (value != null) {
65 bind(channel, future, (LocalAddress) value);
66 } else {
67 channel.closeNow(future);
68 }
69 break;
70 case CONNECTED:
71 if (value != null) {
72 connect(channel, future, (LocalAddress) value);
73 } else {
74 channel.closeNow(future);
75 }
76 break;
77 case INTEREST_OPS:
78
79 future.setSuccess();
80 break;
81 }
82 }
83 else if (e instanceof MessageEvent) {
84 MessageEvent event = (MessageEvent) e;
85 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
86 boolean offered = channel.writeBuffer.offer(event);
87 assert offered;
88 channel.flushWriteBuffer();
89 }
90 }
91
92 private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
93 try {
94 if (!LocalChannelRegistry.register(localAddress, channel)) {
95 throw new ChannelException("address already in use: " + localAddress);
96 }
97
98 if (!channel.bound.compareAndSet(false, true)) {
99 throw new ChannelException("already bound");
100 }
101
102 channel.localAddress = localAddress;
103 future.setSuccess();
104 fireChannelBound(channel, localAddress);
105 } catch (Throwable t) {
106 LocalChannelRegistry.unregister(localAddress);
107 future.setFailure(t);
108 fireExceptionCaught(channel, t);
109 }
110 }
111
112 private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
113 Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
114 if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
115 future.setFailure(new ConnectException("connection refused"));
116 return;
117 }
118
119 DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
120 ChannelPipeline pipeline;
121 try {
122 pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
123 } catch (Exception e) {
124 future.setFailure(e);
125 fireExceptionCaught(channel, e);
126 logger.warn(
127 "Failed to initialize an accepted socket.", e);
128 return;
129 }
130
131 future.setSuccess();
132 DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
133 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
134 channel.pairedChannel = acceptedChannel;
135
136 bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
137 channel.remoteAddress = serverChannel.getLocalAddress();
138 fireChannelConnected(channel, serverChannel.getLocalAddress());
139
140 acceptedChannel.localAddress = serverChannel.getLocalAddress();
141 acceptedChannel.bound.set(true);
142 fireChannelBound(acceptedChannel, channel.getRemoteAddress());
143 acceptedChannel.remoteAddress = channel.getLocalAddress();
144 fireChannelConnected(acceptedChannel, channel.getLocalAddress());
145
146
147 channel.flushWriteBuffer();
148 acceptedChannel.flushWriteBuffer();
149 }
150 }