1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.nio.channels.ClosedChannelException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.jboss.netty.buffer.ChannelBuffers;
27 import org.jboss.netty.channel.Channel;
28 import org.jboss.netty.channel.ChannelDownstreamHandler;
29 import org.jboss.netty.channel.ChannelEvent;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.ChannelFutureListener;
32 import org.jboss.netty.channel.ChannelHandler;
33 import org.jboss.netty.channel.ChannelHandlerContext;
34 import org.jboss.netty.channel.ChannelPipeline;
35 import org.jboss.netty.channel.ChannelStateEvent;
36 import org.jboss.netty.channel.ChannelUpstreamHandler;
37 import org.jboss.netty.channel.Channels;
38 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
39 import org.jboss.netty.channel.MessageEvent;
40 import org.jboss.netty.logging.InternalLogger;
41 import org.jboss.netty.logging.InternalLoggerFactory;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class ChunkedWriteHandler
77 implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler {
78
79 private static final InternalLogger logger =
80 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
81
82 private final Queue<MessageEvent> queue = new ConcurrentLinkedQueue<MessageEvent>();
83
84 private volatile ChannelHandlerContext ctx;
85 private final AtomicBoolean flush = new AtomicBoolean(false);
86 private MessageEvent currentEvent;
87
88
89
90
91 public void resumeTransfer() {
92 ChannelHandlerContext ctx = this.ctx;
93 if (ctx == null) {
94 return;
95 }
96
97 try {
98 flush(ctx, false);
99 } catch (Exception e) {
100 if (logger.isWarnEnabled()) {
101 logger.warn("Unexpected exception while sending chunks.", e);
102 }
103 }
104 }
105
106 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
107 throws Exception {
108 if (!(e instanceof MessageEvent)) {
109 ctx.sendDownstream(e);
110 return;
111 }
112
113 boolean offered = queue.offer((MessageEvent) e);
114 assert offered;
115
116 final Channel channel = ctx.getChannel();
117
118
119 if (channel.isWritable() || !channel.isConnected()) {
120 this.ctx = ctx;
121 flush(ctx, false);
122 }
123 }
124
125 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
126 throws Exception {
127 if (e instanceof ChannelStateEvent) {
128 ChannelStateEvent cse = (ChannelStateEvent) e;
129 switch (cse.getState()) {
130 case INTEREST_OPS:
131
132 flush(ctx, true);
133 break;
134 case OPEN:
135 if (!Boolean.TRUE.equals(cse.getValue())) {
136
137 flush(ctx, true);
138 }
139 break;
140 }
141 }
142 ctx.sendUpstream(e);
143 }
144
145 private void discard(ChannelHandlerContext ctx, boolean fireNow) {
146 ClosedChannelException cause = null;
147
148 for (;;) {
149 MessageEvent currentEvent = this.currentEvent;
150
151 if (this.currentEvent == null) {
152 currentEvent = queue.poll();
153 } else {
154 this.currentEvent = null;
155 }
156
157 if (currentEvent == null) {
158 break;
159 }
160
161
162 Object m = currentEvent.getMessage();
163 if (m instanceof ChunkedInput) {
164 closeInput((ChunkedInput) m);
165 }
166
167
168 if (cause == null) {
169 cause = new ClosedChannelException();
170 }
171 currentEvent.getFuture().setFailure(cause);
172
173 currentEvent = null;
174 }
175
176
177 if (cause != null) {
178 if (fireNow) {
179 Channels.fireExceptionCaught(ctx.getChannel(), cause);
180 } else {
181 Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
182 }
183 }
184 }
185
186 private void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
187 boolean acquired = false;
188 final Channel channel = ctx.getChannel();
189
190
191 if (acquired = flush.compareAndSet(false, true)) {
192 try {
193
194 if (!channel.isConnected()) {
195 discard(ctx, fireNow);
196 return;
197 }
198
199 while (channel.isWritable()) {
200 if (currentEvent == null) {
201 currentEvent = queue.poll();
202 }
203
204 if (currentEvent == null) {
205 break;
206 }
207
208 if (currentEvent.getFuture().isDone()) {
209
210
211 currentEvent = null;
212 } else {
213 final MessageEvent currentEvent = this.currentEvent;
214 Object m = currentEvent.getMessage();
215 if (m instanceof ChunkedInput) {
216 final ChunkedInput chunks = (ChunkedInput) m;
217 Object chunk;
218 boolean endOfInput;
219 boolean suspend;
220 try {
221 chunk = chunks.nextChunk();
222 endOfInput = chunks.isEndOfInput();
223 if (chunk == null) {
224 chunk = ChannelBuffers.EMPTY_BUFFER;
225
226 suspend = !endOfInput;
227 } else {
228 suspend = false;
229 }
230 } catch (Throwable t) {
231 this.currentEvent = null;
232
233 currentEvent.getFuture().setFailure(t);
234 if (fireNow) {
235 fireExceptionCaught(ctx, t);
236 } else {
237 fireExceptionCaughtLater(ctx, t);
238 }
239
240 closeInput(chunks);
241 break;
242 }
243
244 if (suspend) {
245
246
247
248 break;
249 } else {
250 ChannelFuture writeFuture;
251 if (endOfInput) {
252 this.currentEvent = null;
253 writeFuture = currentEvent.getFuture();
254
255
256
257
258
259
260 writeFuture.addListener(new ChannelFutureListener() {
261
262 public void operationComplete(ChannelFuture future) throws Exception {
263 closeInput(chunks);
264 }
265 });
266 } else {
267 writeFuture = future(channel);
268 writeFuture.addListener(new ChannelFutureListener() {
269 public void operationComplete(ChannelFuture future) throws Exception {
270 if (!future.isSuccess()) {
271 currentEvent.getFuture().setFailure(future.getCause());
272 closeInput((ChunkedInput) currentEvent.getMessage());
273 }
274 }
275 });
276 }
277
278 Channels.write(
279 ctx, writeFuture, chunk,
280 currentEvent.getRemoteAddress());
281 }
282 } else {
283 this.currentEvent = null;
284 ctx.sendDownstream(currentEvent);
285 }
286 }
287
288 if (!channel.isConnected()) {
289 discard(ctx, fireNow);
290 return;
291 }
292 }
293 } finally {
294
295 flush.set(false);
296 }
297
298 }
299
300 if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
301 flush(ctx, fireNow);
302 }
303 }
304
305 static void closeInput(ChunkedInput chunks) {
306 try {
307 chunks.close();
308 } catch (Throwable t) {
309 if (logger.isWarnEnabled()) {
310 logger.warn("Failed to close a chunked input.", t);
311 }
312 }
313 }
314
315 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
316
317
318 }
319
320 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
321
322
323 }
324
325 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
326
327
328
329 flush(ctx, false);
330 }
331
332
333 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
334
335
336
337 Throwable cause = null;
338 boolean fireExceptionCaught = false;
339
340 for (;;) {
341 MessageEvent currentEvent = this.currentEvent;
342
343 if (this.currentEvent == null) {
344 currentEvent = queue.poll();
345 } else {
346 this.currentEvent = null;
347 }
348
349 if (currentEvent == null) {
350 break;
351 }
352
353 Object m = currentEvent.getMessage();
354 if (m instanceof ChunkedInput) {
355 closeInput((ChunkedInput) m);
356 }
357
358
359 if (cause == null) {
360 cause = new IOException("Unable to flush event, discarding");
361 }
362 currentEvent.getFuture().setFailure(cause);
363 fireExceptionCaught = true;
364
365 currentEvent = null;
366 }
367
368 if (fireExceptionCaught) {
369 Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
370 }
371 }
372 }