1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.http;
17
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.atomic.AtomicLong;
21
22 import org.jboss.netty.buffer.ChannelBuffer;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelDownstreamHandler;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.ChannelUpstreamHandler;
29 import org.jboss.netty.handler.codec.PrematureChannelClosureException;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class HttpClientCodec implements ChannelUpstreamHandler,
49 ChannelDownstreamHandler {
50
51
52 final Queue<HttpMethod> queue = new ConcurrentLinkedQueue<HttpMethod>();
53
54
55 volatile boolean done;
56
57 private final HttpRequestEncoder encoder = new Encoder();
58 private final HttpResponseDecoder decoder;
59 private final AtomicLong requestResponseCounter = new AtomicLong(0);
60
61 private final boolean failOnMissingResponse;
62
63
64
65
66
67
68
69 public HttpClientCodec() {
70 this(4096, 8192, 8192, false);
71 }
72
73
74
75
76 public HttpClientCodec(
77 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
78 this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
79 }
80
81
82
83
84 public HttpClientCodec(
85 int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) {
86 decoder = new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize);
87 this.failOnMissingResponse = failOnMissingResponse;
88 }
89
90 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
91 throws Exception {
92 decoder.handleUpstream(ctx, e);
93 }
94
95 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
96 throws Exception {
97 encoder.handleDownstream(ctx, e);
98 }
99
100 private final class Encoder extends HttpRequestEncoder {
101
102 Encoder() {
103 super();
104 }
105
106 @Override
107 protected Object encode(ChannelHandlerContext ctx, Channel channel,
108 Object msg) throws Exception {
109 if (msg instanceof HttpRequest && !done) {
110 queue.offer(((HttpRequest) msg).getMethod());
111 }
112
113 Object obj = super.encode(ctx, channel, msg);
114
115 if (failOnMissingResponse) {
116
117 if (msg instanceof HttpRequest && !((HttpRequest) msg).isChunked()) {
118 requestResponseCounter.incrementAndGet();
119 } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
120
121 requestResponseCounter.incrementAndGet();
122 }
123 }
124 return obj;
125 }
126 }
127
128 private final class Decoder extends HttpResponseDecoder {
129
130 Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
131 super(maxInitialLineLength, maxHeaderSize, maxChunkSize);
132 }
133
134 @Override
135 protected Object decode(ChannelHandlerContext ctx, Channel channel,
136 ChannelBuffer buffer, State state) throws Exception {
137 if (done) {
138 return buffer.readBytes(actualReadableBytes());
139 } else {
140 Object msg = super.decode(ctx, channel, buffer, state);
141 if (failOnMissingResponse) {
142 decrement(msg);
143 }
144 return msg;
145 }
146 }
147
148 private void decrement(Object msg) {
149 if (msg == null) {
150 return;
151 }
152
153
154 if (msg instanceof HttpMessage && !((HttpMessage) msg).isChunked()) {
155 requestResponseCounter.decrementAndGet();
156 } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
157 requestResponseCounter.decrementAndGet();
158 } else if (msg instanceof Object[]) {
159
160
161 requestResponseCounter.decrementAndGet();
162 }
163 }
164 @Override
165 protected boolean isContentAlwaysEmpty(HttpMessage msg) {
166 final int statusCode = ((HttpResponse) msg).getStatus().getCode();
167 if (statusCode == 100) {
168
169 return true;
170 }
171
172
173
174 HttpMethod method = queue.poll();
175
176 char firstChar = method.getName().charAt(0);
177 switch (firstChar) {
178 case 'H':
179
180
181
182
183 if (HttpMethod.HEAD.equals(method)) {
184 return true;
185
186
187
188
189
190
191
192
193
194
195
196
197
198 }
199 break;
200 case 'C':
201
202 if (statusCode == 200) {
203 if (HttpMethod.CONNECT.equals(method)) {
204
205 done = true;
206 queue.clear();
207 return true;
208 }
209 }
210 break;
211 }
212
213 return super.isContentAlwaysEmpty(msg);
214 }
215
216 @Override
217 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
218 super.channelClosed(ctx, e);
219 if (failOnMissingResponse) {
220 long missingResponses = requestResponseCounter.get();
221 if (missingResponses > 0) {
222 throw new PrematureChannelClosureException(
223 "Channel closed but still missing " + missingResponses + " response(s)");
224 }
225 }
226 }
227 }
228 }