View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.http;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
20  
21  import java.util.List;
22  import java.util.Map.Entry;
23  
24  import org.jboss.netty.buffer.ChannelBuffer;
25  import org.jboss.netty.buffer.ChannelBuffers;
26  import org.jboss.netty.buffer.CompositeChannelBuffer;
27  import org.jboss.netty.channel.ChannelHandler;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.Channels;
31  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32  import org.jboss.netty.channel.MessageEvent;
33  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34  import org.jboss.netty.handler.codec.frame.TooLongFrameException;
35  import org.jboss.netty.util.CharsetUtil;
36  
37  /**
38   * A {@link ChannelHandler} that aggregates an {@link HttpMessage}
39   * and its following {@link HttpChunk}s into a single {@link HttpMessage} with
40   * no following {@link HttpChunk}s.  It is useful when you don't want to take
41   * care of HTTP messages whose transfer encoding is 'chunked'.  Insert this
42   * handler after {@link HttpMessageDecoder} in the {@link ChannelPipeline}:
43   * <pre>
44   * {@link ChannelPipeline} p = ...;
45   * ...
46   * p.addLast("decoder", new {@link HttpRequestDecoder}());
47   * p.addLast("aggregator", <b>new {@link HttpChunkAggregator}(1048576)</b>);
48   * ...
49   * p.addLast("encoder", new {@link HttpResponseEncoder}());
50   * p.addLast("handler", new HttpRequestHandler());
51   * </pre>
52   * @apiviz.landmark
53   * @apiviz.has org.jboss.netty.handler.codec.http.HttpChunk oneway - - filters out
54   */
55  public class HttpChunkAggregator extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
56      public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
57  
58      private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer(
59              "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
60  
61      private final int maxContentLength;
62      private HttpMessage currentMessage;
63  
64      private ChannelHandlerContext ctx;
65  
66      private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
67  
68      /**
69       * Creates a new instance.
70       *
71       * @param maxContentLength
72       *        the maximum length of the aggregated content.
73       *        If the length of the aggregated content exceeds this value,
74       *        a {@link TooLongFrameException} will be raised.
75       */
76      public HttpChunkAggregator(int maxContentLength) {
77          if (maxContentLength <= 0) {
78              throw new IllegalArgumentException(
79                      "maxContentLength must be a positive integer: " +
80                      maxContentLength);
81          }
82          this.maxContentLength = maxContentLength;
83      }
84  
85      /**
86       * Returns the maximum number of components in the cumulation buffer.  If the number of
87       * the components in the cumulation buffer exceeds this value, the components of the
88       * cumulation buffer are consolidated into a single component, involving memory copies.
89       * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
90       */
91      public final int getMaxCumulationBufferComponents() {
92          return maxCumulationBufferComponents;
93      }
94  
95      /**
96       * Sets the maximum number of components in the cumulation buffer.  If the number of
97       * the components in the cumulation buffer exceeds this value, the components of the
98       * cumulation buffer are consolidated into a single component, involving memory copies.
99       * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
100      * and its minimum allowed value is {@code 2}.
101      */
102     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
103         if (maxCumulationBufferComponents < 2) {
104             throw new IllegalArgumentException(
105                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
106                     " (expected: >= 2)");
107         }
108 
109         if (ctx == null) {
110             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
111         } else {
112             throw new IllegalStateException(
113                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
114         }
115     }
116 
117     @Override
118     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
119             throws Exception {
120 
121         Object msg = e.getMessage();
122         HttpMessage currentMessage = this.currentMessage;
123 
124         if (msg instanceof HttpMessage) {
125             HttpMessage m = (HttpMessage) msg;
126 
127             // Handle the 'Expect: 100-continue' header if necessary.
128             // TODO: Respond with 413 Request Entity Too Large
129             //   and discard the traffic or close the connection.
130             //       No need to notify the upstream handlers - just log.
131             //       If decoding a response, just throw an exception.
132             if (is100ContinueExpected(m)) {
133                 write(ctx, succeededFuture(ctx.getChannel()), CONTINUE.duplicate());
134             }
135 
136             if (m.isChunked()) {
137                 // A chunked message - remove 'Transfer-Encoding' header,
138                 // initialize the cumulative buffer, and wait for incoming chunks.
139                 List<String> encodings = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING);
140                 encodings.remove(HttpHeaders.Values.CHUNKED);
141                 if (encodings.isEmpty()) {
142                     m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
143                 }
144                 m.setChunked(false);
145                 this.currentMessage = m;
146             } else {
147                 // Not a chunked message - pass through.
148                 this.currentMessage = null;
149                 ctx.sendUpstream(e);
150             }
151         } else if (msg instanceof HttpChunk) {
152             // Sanity check
153             if (currentMessage == null) {
154                 throw new IllegalStateException(
155                         "received " + HttpChunk.class.getSimpleName() +
156                         " without " + HttpMessage.class.getSimpleName());
157             }
158 
159             // Merge the received chunk into the content of the current message.
160             HttpChunk chunk = (HttpChunk) msg;
161             ChannelBuffer content = currentMessage.getContent();
162 
163             if (content.readableBytes() > maxContentLength - chunk.getContent().readableBytes()) {
164                 // TODO: Respond with 413 Request Entity Too Large
165                 //   and discard the traffic or close the connection.
166                 //       No need to notify the upstream handlers - just log.
167                 //       If decoding a response, just throw an exception.
168                 throw new TooLongFrameException(
169                         "HTTP content length exceeded " + maxContentLength +
170                         " bytes.");
171             }
172 
173             // Append the content of the chunk
174             appendToCumulation(chunk.getContent());
175 
176             if (chunk.isLast()) {
177                 this.currentMessage = null;
178 
179                 // Merge trailing headers into the message.
180                 if (chunk instanceof HttpChunkTrailer) {
181                     HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
182                     for (Entry<String, String> header: trailer.getHeaders()) {
183                         currentMessage.setHeader(header.getKey(), header.getValue());
184                     }
185                 }
186 
187                 // Set the 'Content-Length' header.
188                 currentMessage.setHeader(
189                         HttpHeaders.Names.CONTENT_LENGTH,
190                         String.valueOf(content.readableBytes()));
191 
192                 // All done - generate the event.
193                 Channels.fireMessageReceived(ctx, currentMessage, e.getRemoteAddress());
194             }
195         } else {
196             // Neither HttpMessage or HttpChunk
197             ctx.sendUpstream(e);
198         }
199     }
200 
201     protected void appendToCumulation(ChannelBuffer input) {
202         ChannelBuffer cumulation = this.currentMessage.getContent();
203         if (cumulation instanceof CompositeChannelBuffer) {
204             // Make sure the resulting cumulation buffer has no more than the configured components.
205             CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
206             if (composite.numComponents() >= maxCumulationBufferComponents) {
207                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(composite.copy(), input));
208             } else {
209                 List<ChannelBuffer> decomposed = composite.decompose(0, composite.readableBytes());
210                 ChannelBuffer[] buffers = decomposed.toArray(new ChannelBuffer[decomposed.size() + 1]);
211                 buffers[buffers.length - 1] = input;
212 
213                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(buffers));
214             }
215         } else {
216             currentMessage.setContent(ChannelBuffers.wrappedBuffer(cumulation, input));
217         }
218 
219     }
220 
221     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
222         this.ctx = ctx;
223     }
224 
225     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
226         // noop
227     }
228 
229     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
230         // noop
231     }
232 
233     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
234         // noop
235     }
236 }