1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.embedder;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.lang.reflect.Array;
21 import java.util.ConcurrentModificationException;
22 import java.util.LinkedList;
23 import java.util.Queue;
24
25 import org.jboss.netty.buffer.ChannelBufferFactory;
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelEvent;
28 import org.jboss.netty.channel.ChannelFuture;
29 import org.jboss.netty.channel.ChannelHandler;
30 import org.jboss.netty.channel.ChannelHandlerContext;
31 import org.jboss.netty.channel.ChannelPipeline;
32 import org.jboss.netty.channel.ChannelPipelineException;
33 import org.jboss.netty.channel.ChannelSink;
34 import org.jboss.netty.channel.ChannelUpstreamHandler;
35 import org.jboss.netty.channel.Channels;
36 import org.jboss.netty.channel.DefaultChannelPipeline;
37 import org.jboss.netty.channel.ExceptionEvent;
38 import org.jboss.netty.channel.MessageEvent;
39
40
41
42
43 abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
44
45 private final Channel channel;
46 private final ChannelPipeline pipeline;
47 private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
48
49 final Queue<Object> productQueue = new LinkedList<Object>();
50
51
52
53
54
55 protected AbstractCodecEmbedder(ChannelHandler... handlers) {
56 pipeline = new EmbeddedChannelPipeline();
57 configurePipeline(handlers);
58 channel = new EmbeddedChannel(pipeline, sink);
59 fireInitialEvents();
60 }
61
62
63
64
65
66
67
68
69 protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
70 this(handlers);
71 getChannel().getConfig().setBufferFactory(bufferFactory);
72 }
73
74 private void fireInitialEvents() {
75
76 fireChannelOpen(channel);
77 fireChannelBound(channel, channel.getLocalAddress());
78 fireChannelConnected(channel, channel.getRemoteAddress());
79 }
80
81 private void configurePipeline(ChannelHandler... handlers) {
82 if (handlers == null) {
83 throw new NullPointerException("handlers");
84 }
85
86 if (handlers.length == 0) {
87 throw new IllegalArgumentException(
88 "handlers should contain at least one " +
89 ChannelHandler.class.getSimpleName() + '.');
90 }
91
92 for (int i = 0; i < handlers.length; i ++) {
93 ChannelHandler h = handlers[i];
94 if (h == null) {
95 throw new NullPointerException("handlers[" + i + "]");
96 }
97 pipeline.addLast(String.valueOf(i), handlers[i]);
98 }
99 pipeline.addLast("SINK", sink);
100 }
101
102 public boolean finish() {
103 close(channel);
104 fireChannelDisconnected(channel);
105 fireChannelUnbound(channel);
106 fireChannelClosed(channel);
107 return !productQueue.isEmpty();
108 }
109
110
111
112
113
114 protected final Channel getChannel() {
115 return channel;
116 }
117
118
119
120
121
122 protected final boolean isEmpty() {
123 return productQueue.isEmpty();
124 }
125
126 public final E poll() {
127 return (E) productQueue.poll();
128 }
129
130 public final E peek() {
131 return (E) productQueue.peek();
132 }
133
134 public final Object[] pollAll() {
135 final int size = size();
136 Object[] a = new Object[size];
137 for (int i = 0; i < size; i ++) {
138 E product = poll();
139 if (product == null) {
140 throw new ConcurrentModificationException();
141 }
142 a[i] = product;
143 }
144 return a;
145 }
146
147 @SuppressWarnings("unchecked")
148 public final <T> T[] pollAll(T[] a) {
149 if (a == null) {
150 throw new NullPointerException("a");
151 }
152
153 final int size = size();
154
155
156 if (a.length < size) {
157 a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
158 }
159
160 for (int i = 0;; i ++) {
161 T product = (T) poll();
162 if (product == null) {
163 break;
164 }
165 a[i] = product;
166 }
167
168
169 if (a.length > size) {
170 a[size] = null;
171 }
172
173 return a;
174 }
175
176 public final int size() {
177 return productQueue.size();
178 }
179
180 public ChannelPipeline getPipeline() {
181 return pipeline;
182 }
183
184 private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
185 EmbeddedChannelSink() {
186 super();
187 }
188
189 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
190 handleEvent(e);
191 }
192
193 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
194 handleEvent(e);
195 }
196
197 private void handleEvent(ChannelEvent e) {
198 if (e instanceof MessageEvent) {
199 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
200 assert offered;
201 } else if (e instanceof ExceptionEvent) {
202 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
203 }
204
205
206 }
207
208 public void exceptionCaught(
209 ChannelPipeline pipeline, ChannelEvent e,
210 ChannelPipelineException cause) throws Exception {
211 Throwable actualCause = cause.getCause();
212 if (actualCause == null) {
213 actualCause = cause;
214 }
215
216 throw new CodecEmbedderException(actualCause);
217 }
218
219 public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
220 try {
221 task.run();
222 return Channels.succeededFuture(pipeline.getChannel());
223 } catch (Throwable t) {
224 return Channels.failedFuture(pipeline.getChannel(), t);
225 }
226 }
227 }
228
229 private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
230
231 EmbeddedChannelPipeline() {
232 super();
233 }
234
235 @Override
236 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
237 while (t instanceof ChannelPipelineException && t.getCause() != null) {
238 t = t.getCause();
239 }
240 if (t instanceof CodecEmbedderException) {
241 throw (CodecEmbedderException) t;
242 } else {
243 throw new CodecEmbedderException(t);
244 }
245 }
246 }
247 }