View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.stream;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.util.Queue;
21  
22  import org.jboss.netty.buffer.ChannelBuffers;
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelDownstreamHandler;
25  import org.jboss.netty.channel.ChannelEvent;
26  import org.jboss.netty.channel.ChannelFuture;
27  import org.jboss.netty.channel.ChannelFutureListener;
28  import org.jboss.netty.channel.ChannelHandler;
29  import org.jboss.netty.channel.ChannelHandlerContext;
30  import org.jboss.netty.channel.ChannelPipeline;
31  import org.jboss.netty.channel.ChannelStateEvent;
32  import org.jboss.netty.channel.ChannelUpstreamHandler;
33  import org.jboss.netty.channel.Channels;
34  import org.jboss.netty.channel.MessageEvent;
35  import org.jboss.netty.logging.InternalLogger;
36  import org.jboss.netty.logging.InternalLoggerFactory;
37  import org.jboss.netty.util.internal.LinkedTransferQueue;
38  
39  /**
40   * A {@link ChannelHandler} that adds support for writing a large data stream
41   * asynchronously neither spending a lot of memory nor getting
42   * {@link java.lang.OutOfMemoryError}.  Large data streaming such as file
43   * transfer requires complicated state management in a {@link ChannelHandler}
44   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
45   * so that you can send a large data stream without difficulties.
46   * <p>
47   * To use {@link ChunkedWriteHandler} in your application, you have to insert
48   * a new {@link ChunkedWriteHandler} instance:
49   * <pre>
50   * {@link ChannelPipeline} p = ...;
51   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
52   * p.addLast("handler", new MyHandler());
53   * </pre>
54   * Once inserted, you can write a {@link ChunkedInput} so that the
55   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
56   * stream chunk by chunk and write the fetched chunk downstream:
57   * <pre>
58   * {@link Channel} ch = ...;
59   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
60   * </pre>
61   *
62   * <h3>Sending a stream which generates a chunk intermittently</h3>
63   *
64   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
65   * Such {@link ChunkedInput} implementation often returns {@code false} on
66   * {@link ChunkedInput#hasNextChunk()}, resulting in the indefinitely suspended
67   * transfer.  To resume the transfer when a new chunk is available, you have to
68   * call {@link #resumeTransfer()}.
69   *
70   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
71   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
72   * @version $Rev: 2233 $, $Date: 2010-04-06 17:43:49 +0900 (Tue, 06 Apr 2010) $
73   *
74   * @apiviz.landmark
75   * @apiviz.has org.jboss.netty.handler.stream.ChunkedInput oneway - - reads from
76   */
77  public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
78  
79      private static final InternalLogger logger =
80          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
81  
82      private final Queue<MessageEvent> queue =
83          new LinkedTransferQueue<MessageEvent>();
84  
85      private ChannelHandlerContext ctx;
86      private MessageEvent currentEvent;
87  
88      /**
89       * Creates a new instance.
90       */
91      public ChunkedWriteHandler() {
92          super();
93      }
94  
95      /**
96       * Continues to fetch the chunks from the input.
97       */
98      public void resumeTransfer() {
99          ChannelHandlerContext ctx = this.ctx;
100         if (ctx == null) {
101             return;
102         }
103 
104         try {
105             flush(ctx);
106         } catch (Exception e) {
107             logger.warn("Unexpected exception while sending chunks.", e);
108         }
109     }
110 
111     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
112             throws Exception {
113         if (!(e instanceof MessageEvent)) {
114             ctx.sendDownstream(e);
115             return;
116         }
117 
118         boolean offered = queue.offer((MessageEvent) e);
119         assert offered;
120 
121         if (ctx.getChannel().isWritable()) {
122             this.ctx = ctx;
123             flush(ctx);
124         }
125     }
126 
127     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
128             throws Exception {
129         if (e instanceof ChannelStateEvent) {
130             ChannelStateEvent cse = (ChannelStateEvent) e;
131             switch (cse.getState()) {
132             case INTEREST_OPS:
133                 // Continue writing when the channel becomes writable.
134                 flush(ctx);
135                 break;
136             case OPEN:
137                 if (!Boolean.TRUE.equals(cse.getValue())) {
138                     // Fail all pending writes
139                     discard(ctx);
140                 }
141                 break;
142             }
143         }
144         ctx.sendUpstream(e);
145     }
146 
147     private synchronized void discard(ChannelHandlerContext ctx) {
148         for (;;) {
149             if (currentEvent == null) {
150                 currentEvent = queue.poll();
151             }
152 
153             if (currentEvent == null) {
154                 break;
155             }
156 
157             MessageEvent currentEvent = this.currentEvent;
158             this.currentEvent = null;
159 
160             Object m = currentEvent.getMessage();
161             if (m instanceof ChunkedInput) {
162                 closeInput((ChunkedInput) m);
163 
164                 // Trigger a ClosedChannelException
165                 Channels.write(
166                         ctx, currentEvent.getFuture(), ChannelBuffers.EMPTY_BUFFER,
167                         currentEvent.getRemoteAddress());
168             } else {
169                 // Trigger a ClosedChannelException
170                 ctx.sendDownstream(currentEvent);
171             }
172             currentEvent = null;
173         }
174     }
175 
176     private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
177         final Channel channel = ctx.getChannel();
178         if (!channel.isConnected()) {
179             discard(ctx);
180         }
181 
182         while (channel.isWritable()) {
183             if (currentEvent == null) {
184                 currentEvent = queue.poll();
185             }
186 
187             if (currentEvent == null) {
188                 break;
189             }
190 
191             if (currentEvent.getFuture().isDone()) {
192                 // Skip the current request because the previous partial write
193                 // attempt for the current request has been failed.
194                 currentEvent = null;
195             } else {
196                 Object m = currentEvent.getMessage();
197                 if (m instanceof ChunkedInput) {
198                     ChunkedInput chunks = (ChunkedInput) m;
199                     Object chunk;
200                     boolean endOfInput;
201                     boolean later;
202                     try {
203                         chunk = chunks.nextChunk();
204                         if (chunk == null) {
205                             chunk = ChannelBuffers.EMPTY_BUFFER;
206                             later = true;
207                         } else {
208                             later = false;
209                         }
210                         endOfInput = chunks.isEndOfInput();
211                     } catch (Throwable t) {
212                         MessageEvent currentEvent = this.currentEvent;
213                         this.currentEvent = null;
214 
215                         currentEvent.getFuture().setFailure(t);
216                         fireExceptionCaught(ctx, t);
217 
218                         closeInput(chunks);
219                         break;
220                     }
221 
222                     ChannelFuture writeFuture;
223                     final MessageEvent currentEvent = this.currentEvent;
224                     if (endOfInput) {
225                         this.currentEvent = null;
226                         closeInput(chunks);
227                         writeFuture = currentEvent.getFuture();
228                     } else {
229                         writeFuture = future(channel);
230                         writeFuture.addListener(new ChannelFutureListener() {
231                             public void operationComplete(ChannelFuture future)
232                                     throws Exception {
233                                 if (!future.isSuccess()) {
234                                     currentEvent.getFuture().setFailure(future.getCause());
235                                     closeInput((ChunkedInput) currentEvent.getMessage());
236                                 }
237                             }
238                         });
239                     }
240 
241                     Channels.write(
242                             ctx, writeFuture, chunk,
243                             currentEvent.getRemoteAddress());
244 
245                     if (later) {
246                         // ChunkedInput.nextChunk() returned null.
247                         // Let's wait until more chunks arrive.
248                         break;
249                     }
250                 } else {
251                     MessageEvent currentEvent = this.currentEvent;
252                     this.currentEvent = null;
253                     ctx.sendDownstream(currentEvent);
254                 }
255             }
256 
257             if (!channel.isConnected()) {
258                 discard(ctx);
259                 break;
260             }
261         }
262     }
263 
264     static void closeInput(ChunkedInput chunks) {
265         try {
266             chunks.close();
267         } catch (Throwable t) {
268             logger.warn("Failed to close a chunked input.", t);
269         }
270     }
271 }