1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.util.Queue;
21
22 import org.jboss.netty.buffer.ChannelBuffers;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelDownstreamHandler;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.ChannelFutureListener;
28 import org.jboss.netty.channel.ChannelHandler;
29 import org.jboss.netty.channel.ChannelHandlerContext;
30 import org.jboss.netty.channel.ChannelPipeline;
31 import org.jboss.netty.channel.ChannelStateEvent;
32 import org.jboss.netty.channel.ChannelUpstreamHandler;
33 import org.jboss.netty.channel.Channels;
34 import org.jboss.netty.channel.MessageEvent;
35 import org.jboss.netty.logging.InternalLogger;
36 import org.jboss.netty.logging.InternalLoggerFactory;
37 import org.jboss.netty.util.internal.LinkedTransferQueue;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
78
79 private static final InternalLogger logger =
80 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
81
82 private final Queue<MessageEvent> queue =
83 new LinkedTransferQueue<MessageEvent>();
84
85 private ChannelHandlerContext ctx;
86 private MessageEvent currentEvent;
87
88
89
90
91 public ChunkedWriteHandler() {
92 super();
93 }
94
95
96
97
98 public void resumeTransfer() {
99 ChannelHandlerContext ctx = this.ctx;
100 if (ctx == null) {
101 return;
102 }
103
104 try {
105 flush(ctx);
106 } catch (Exception e) {
107 logger.warn("Unexpected exception while sending chunks.", e);
108 }
109 }
110
111 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
112 throws Exception {
113 if (!(e instanceof MessageEvent)) {
114 ctx.sendDownstream(e);
115 return;
116 }
117
118 boolean offered = queue.offer((MessageEvent) e);
119 assert offered;
120
121 if (ctx.getChannel().isWritable()) {
122 this.ctx = ctx;
123 flush(ctx);
124 }
125 }
126
127 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
128 throws Exception {
129 if (e instanceof ChannelStateEvent) {
130 ChannelStateEvent cse = (ChannelStateEvent) e;
131 switch (cse.getState()) {
132 case INTEREST_OPS:
133
134 flush(ctx);
135 break;
136 case OPEN:
137 if (!Boolean.TRUE.equals(cse.getValue())) {
138
139 discard(ctx);
140 }
141 break;
142 }
143 }
144 ctx.sendUpstream(e);
145 }
146
147 private synchronized void discard(ChannelHandlerContext ctx) {
148 for (;;) {
149 if (currentEvent == null) {
150 currentEvent = queue.poll();
151 }
152
153 if (currentEvent == null) {
154 break;
155 }
156
157 MessageEvent currentEvent = this.currentEvent;
158 this.currentEvent = null;
159
160 Object m = currentEvent.getMessage();
161 if (m instanceof ChunkedInput) {
162 closeInput((ChunkedInput) m);
163
164
165 Channels.write(
166 ctx, currentEvent.getFuture(), ChannelBuffers.EMPTY_BUFFER,
167 currentEvent.getRemoteAddress());
168 } else {
169
170 ctx.sendDownstream(currentEvent);
171 }
172 currentEvent = null;
173 }
174 }
175
176 private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
177 final Channel channel = ctx.getChannel();
178 if (!channel.isConnected()) {
179 discard(ctx);
180 }
181
182 while (channel.isWritable()) {
183 if (currentEvent == null) {
184 currentEvent = queue.poll();
185 }
186
187 if (currentEvent == null) {
188 break;
189 }
190
191 if (currentEvent.getFuture().isDone()) {
192
193
194 currentEvent = null;
195 } else {
196 Object m = currentEvent.getMessage();
197 if (m instanceof ChunkedInput) {
198 ChunkedInput chunks = (ChunkedInput) m;
199 Object chunk;
200 boolean endOfInput;
201 boolean later;
202 try {
203 chunk = chunks.nextChunk();
204 if (chunk == null) {
205 chunk = ChannelBuffers.EMPTY_BUFFER;
206 later = true;
207 } else {
208 later = false;
209 }
210 endOfInput = chunks.isEndOfInput();
211 } catch (Throwable t) {
212 MessageEvent currentEvent = this.currentEvent;
213 this.currentEvent = null;
214
215 currentEvent.getFuture().setFailure(t);
216 fireExceptionCaught(ctx, t);
217
218 closeInput(chunks);
219 break;
220 }
221
222 ChannelFuture writeFuture;
223 final MessageEvent currentEvent = this.currentEvent;
224 if (endOfInput) {
225 this.currentEvent = null;
226 closeInput(chunks);
227 writeFuture = currentEvent.getFuture();
228 } else {
229 writeFuture = future(channel);
230 writeFuture.addListener(new ChannelFutureListener() {
231 public void operationComplete(ChannelFuture future)
232 throws Exception {
233 if (!future.isSuccess()) {
234 currentEvent.getFuture().setFailure(future.getCause());
235 closeInput((ChunkedInput) currentEvent.getMessage());
236 }
237 }
238 });
239 }
240
241 Channels.write(
242 ctx, writeFuture, chunk,
243 currentEvent.getRemoteAddress());
244
245 if (later) {
246
247
248 break;
249 }
250 } else {
251 MessageEvent currentEvent = this.currentEvent;
252 this.currentEvent = null;
253 ctx.sendDownstream(currentEvent);
254 }
255 }
256
257 if (!channel.isConnected()) {
258 discard(ctx);
259 break;
260 }
261 }
262 }
263
264 static void closeInput(ChunkedInput chunks) {
265 try {
266 chunks.close();
267 } catch (Throwable t) {
268 logger.warn("Failed to close a chunked input.", t);
269 }
270 }
271 }