1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.http;
17
18 import static org.jboss.netty.channel.Channels.*;
19 import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
20
21 import java.util.List;
22 import java.util.Map.Entry;
23
24 import org.jboss.netty.buffer.ChannelBuffer;
25 import org.jboss.netty.buffer.ChannelBuffers;
26 import org.jboss.netty.buffer.CompositeChannelBuffer;
27 import org.jboss.netty.channel.ChannelHandler;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34 import org.jboss.netty.handler.codec.frame.TooLongFrameException;
35 import org.jboss.netty.util.CharsetUtil;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class HttpChunkAggregator extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
56 public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
57
58 private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer(
59 "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
60
61 private final int maxContentLength;
62 private HttpMessage currentMessage;
63
64 private ChannelHandlerContext ctx;
65
66 private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
67
68
69
70
71
72
73
74
75
76 public HttpChunkAggregator(int maxContentLength) {
77 if (maxContentLength <= 0) {
78 throw new IllegalArgumentException(
79 "maxContentLength must be a positive integer: " +
80 maxContentLength);
81 }
82 this.maxContentLength = maxContentLength;
83 }
84
85
86
87
88
89
90
91 public final int getMaxCumulationBufferComponents() {
92 return maxCumulationBufferComponents;
93 }
94
95
96
97
98
99
100
101
102 public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
103 if (maxCumulationBufferComponents < 2) {
104 throw new IllegalArgumentException(
105 "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
106 " (expected: >= 2)");
107 }
108
109 if (ctx == null) {
110 this.maxCumulationBufferComponents = maxCumulationBufferComponents;
111 } else {
112 throw new IllegalStateException(
113 "decoder properties cannot be changed once the decoder is added to a pipeline.");
114 }
115 }
116
117 @Override
118 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
119 throws Exception {
120
121 Object msg = e.getMessage();
122 HttpMessage currentMessage = this.currentMessage;
123
124 if (msg instanceof HttpMessage) {
125 HttpMessage m = (HttpMessage) msg;
126
127
128
129
130
131
132 if (is100ContinueExpected(m)) {
133 write(ctx, succeededFuture(ctx.getChannel()), CONTINUE.duplicate());
134 }
135
136 if (m.isChunked()) {
137
138
139 List<String> encodings = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING);
140 encodings.remove(HttpHeaders.Values.CHUNKED);
141 if (encodings.isEmpty()) {
142 m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
143 }
144 m.setChunked(false);
145 this.currentMessage = m;
146 } else {
147
148 this.currentMessage = null;
149 ctx.sendUpstream(e);
150 }
151 } else if (msg instanceof HttpChunk) {
152
153 if (currentMessage == null) {
154 throw new IllegalStateException(
155 "received " + HttpChunk.class.getSimpleName() +
156 " without " + HttpMessage.class.getSimpleName());
157 }
158
159
160 HttpChunk chunk = (HttpChunk) msg;
161 ChannelBuffer content = currentMessage.getContent();
162
163 if (content.readableBytes() > maxContentLength - chunk.getContent().readableBytes()) {
164
165
166
167
168 throw new TooLongFrameException(
169 "HTTP content length exceeded " + maxContentLength +
170 " bytes.");
171 }
172
173
174 appendToCumulation(chunk.getContent());
175
176 if (chunk.isLast()) {
177 this.currentMessage = null;
178
179
180 if (chunk instanceof HttpChunkTrailer) {
181 HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
182 for (Entry<String, String> header: trailer.getHeaders()) {
183 currentMessage.setHeader(header.getKey(), header.getValue());
184 }
185 }
186
187
188 currentMessage.setHeader(
189 HttpHeaders.Names.CONTENT_LENGTH,
190 String.valueOf(content.readableBytes()));
191
192
193 Channels.fireMessageReceived(ctx, currentMessage, e.getRemoteAddress());
194 }
195 } else {
196
197 ctx.sendUpstream(e);
198 }
199 }
200
201 protected void appendToCumulation(ChannelBuffer input) {
202 ChannelBuffer cumulation = this.currentMessage.getContent();
203 if (cumulation instanceof CompositeChannelBuffer) {
204
205 CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
206 if (composite.numComponents() >= maxCumulationBufferComponents) {
207 currentMessage.setContent(ChannelBuffers.wrappedBuffer(composite.copy(), input));
208 } else {
209 List<ChannelBuffer> decomposed = composite.decompose(0, composite.readableBytes());
210 ChannelBuffer[] buffers = decomposed.toArray(new ChannelBuffer[decomposed.size() + 1]);
211 buffers[buffers.length - 1] = input;
212
213 currentMessage.setContent(ChannelBuffers.wrappedBuffer(buffers));
214 }
215 } else {
216 currentMessage.setContent(ChannelBuffers.wrappedBuffer(cumulation, input));
217 }
218
219 }
220
221 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
222 this.ctx = ctx;
223 }
224
225 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
226
227 }
228
229 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
230
231 }
232
233 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
234
235 }
236 }