1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.compression;
17
18 import java.util.concurrent.atomic.AtomicBoolean;
19
20 import org.jboss.netty.buffer.ChannelBuffer;
21 import org.jboss.netty.buffer.ChannelBuffers;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
30 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
31 import org.jboss.netty.util.internal.jzlib.JZlib;
32 import org.jboss.netty.util.internal.jzlib.ZStream;
33
34
35
36
37
38
39
40
41
42
43
44
45 public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler {
46
47 private static final byte[] EMPTY_ARRAY = new byte[0];
48
49 private final ZStream z = new ZStream();
50 private final AtomicBoolean finished = new AtomicBoolean();
51 private volatile ChannelHandlerContext ctx;
52
53
54
55
56
57
58
59 public ZlibEncoder() {
60 this(6);
61 }
62
63
64
65
66
67
68
69
70
71
72
73
74 public ZlibEncoder(int compressionLevel) {
75 this(ZlibWrapper.ZLIB, compressionLevel);
76 }
77
78
79
80
81
82
83
84 public ZlibEncoder(ZlibWrapper wrapper) {
85 this(wrapper, 6);
86 }
87
88
89
90
91
92
93
94
95
96
97
98 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99 if (compressionLevel < 0 || compressionLevel > 9) {
100 throw new IllegalArgumentException(
101 "compressionLevel: " + compressionLevel +
102 " (expected: 0-9)");
103 }
104 if (wrapper == null) {
105 throw new NullPointerException("wrapper");
106 }
107
108 synchronized (z) {
109 int resultCode = z.deflateInit(compressionLevel, ZlibUtil.convertWrapperType(wrapper));
110 if (resultCode != JZlib.Z_OK) {
111 ZlibUtil.fail(z, "initialization failure", resultCode);
112 }
113 }
114 }
115
116
117
118
119
120
121
122
123
124
125
126 public ZlibEncoder(byte[] dictionary) {
127 this(6, dictionary);
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 public ZlibEncoder(int compressionLevel, byte[] dictionary) {
145 if (compressionLevel < 0 || compressionLevel > 9) {
146 throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)");
147 }
148
149 if (dictionary == null) {
150 throw new NullPointerException("dictionary");
151 }
152
153 synchronized (z) {
154 int resultCode;
155 resultCode = z.deflateInit(compressionLevel, JZlib.W_ZLIB);
156 if (resultCode != JZlib.Z_OK) {
157 ZlibUtil.fail(z, "initialization failure", resultCode);
158 } else {
159 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
160 if (resultCode != JZlib.Z_OK){
161 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
162 }
163 }
164 }
165 }
166
167 public ChannelFuture close() {
168 ChannelHandlerContext ctx = this.ctx;
169 if (ctx == null) {
170 throw new IllegalStateException("not added to a pipeline");
171 }
172 return finishEncode(ctx, null);
173 }
174
175 public boolean isClosed() {
176 return finished.get();
177 }
178
179 @Override
180 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
181 if (!(msg instanceof ChannelBuffer) || finished.get()) {
182 return msg;
183 }
184
185 ChannelBuffer result;
186 synchronized (z) {
187 try {
188
189 ChannelBuffer uncompressed = (ChannelBuffer) msg;
190 byte[] in = new byte[uncompressed.readableBytes()];
191 uncompressed.readBytes(in);
192 z.next_in = in;
193 z.next_in_index = 0;
194 z.avail_in = in.length;
195
196
197 byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
198 z.next_out = out;
199 z.next_out_index = 0;
200 z.avail_out = out.length;
201
202
203 int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
204 if (resultCode != JZlib.Z_OK) {
205 ZlibUtil.fail(z, "compression failure", resultCode);
206 }
207
208 if (z.next_out_index != 0) {
209 result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
210 uncompressed.order(), out, 0, z.next_out_index);
211 } else {
212 result = ChannelBuffers.EMPTY_BUFFER;
213 }
214 } finally {
215
216
217
218
219 z.next_in = null;
220 z.next_out = null;
221 }
222 }
223
224 return result;
225 }
226
227 @Override
228 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
229 throws Exception {
230 if (evt instanceof ChannelStateEvent) {
231 ChannelStateEvent e = (ChannelStateEvent) evt;
232 switch (e.getState()) {
233 case OPEN:
234 case CONNECTED:
235 case BOUND:
236 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
237 finishEncode(ctx, evt);
238 return;
239 }
240 }
241 }
242
243 super.handleDownstream(ctx, evt);
244 }
245
246 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
247 if (!finished.compareAndSet(false, true)) {
248 if (evt != null) {
249 ctx.sendDownstream(evt);
250 }
251 return Channels.succeededFuture(ctx.getChannel());
252 }
253
254 ChannelBuffer footer;
255 ChannelFuture future;
256 synchronized (z) {
257 try {
258
259 z.next_in = EMPTY_ARRAY;
260 z.next_in_index = 0;
261 z.avail_in = 0;
262
263
264 byte[] out = new byte[32];
265 z.next_out = out;
266 z.next_out_index = 0;
267 z.avail_out = out.length;
268
269
270 int resultCode = z.deflate(JZlib.Z_FINISH);
271 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
272 future = Channels.failedFuture(
273 ctx.getChannel(),
274 ZlibUtil.exception(z, "compression failure", resultCode));
275 footer = null;
276 } else if (z.next_out_index != 0) {
277 future = Channels.future(ctx.getChannel());
278 footer =
279 ctx.getChannel().getConfig().getBufferFactory().getBuffer(
280 out, 0, z.next_out_index);
281 } else {
282
283
284
285 future = Channels.future(ctx.getChannel());
286 footer = ChannelBuffers.EMPTY_BUFFER;
287 }
288 } finally {
289 z.deflateEnd();
290
291
292
293
294
295 z.next_in = null;
296 z.next_out = null;
297 }
298 }
299
300 if (footer != null) {
301 Channels.write(ctx, future, footer);
302 }
303
304 if (evt != null) {
305 future.addListener(new ChannelFutureListener() {
306 public void operationComplete(ChannelFuture future) throws Exception {
307 ctx.sendDownstream(evt);
308 }
309 });
310 }
311
312 return future;
313 }
314
315 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
316 this.ctx = ctx;
317 }
318
319 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
320
321 }
322
323 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
324
325 }
326
327 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
328
329 }
330 }