View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.queue;
17  
18  import java.io.IOException;
19  import java.nio.channels.ClosedChannelException;
20  import java.util.ArrayList;
21  import java.util.List;
22  import java.util.Queue;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.ConcurrentLinkedQueue;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.jboss.netty.buffer.ChannelBuffer;
28  import org.jboss.netty.buffer.ChannelBuffers;
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelConfig;
31  import org.jboss.netty.channel.ChannelFuture;
32  import org.jboss.netty.channel.ChannelFutureListener;
33  import org.jboss.netty.channel.ChannelHandlerContext;
34  import org.jboss.netty.channel.ChannelStateEvent;
35  import org.jboss.netty.channel.Channels;
36  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
37  import org.jboss.netty.channel.MessageEvent;
38  import org.jboss.netty.channel.SimpleChannelHandler;
39  import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
40  import org.jboss.netty.util.HashedWheelTimer;
41  
42  /**
43   * Emulates buffered write operation.  This handler stores all write requests
44   * into an unbounded {@link Queue} and flushes them to the downstream when
45   * {@link #flush()} method is called.
46   * <p>
47   * Here is an example that demonstrates the usage:
48   * <pre>
49   * BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
50   * ChannelPipeline p = ...;
51   * p.addFirst("buffer", bufferedWriter);
52   *
53   * ...
54   *
55   * Channel ch = ...;
56   *
57   * // msg1, 2, and 3 are stored in the queue of bufferedWriter.
58   * ch.write(msg1);
59   * ch.write(msg2);
60   * ch.write(msg3);
61   *
62   * // and will be flushed on request.
63   * bufferedWriter.flush();
64   * </pre>
65   *
66   * <h3>Auto-flush</h3>
67   * The write request queue is automatically flushed when the associated
68   * {@link Channel} is disconnected or closed.  However, it does not flush the
69   * queue otherwise.  It means you have to call {@link #flush()} before the size
70   * of the queue increases too much.  You can implement your own auto-flush
71   * strategy by extending this handler:
72   * <pre>
73   * public class AutoFlusher extends {@link BufferedWriteHandler} {
74   *
75   *     private final AtomicLong bufferSize = new AtomicLong();
76   *
77   *     {@literal @Override}
78   *     public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
79   *         super.writeRequested(ctx, e);
80   *
81   *         {@link ChannelBuffer} data = ({@link ChannelBuffer}) e.getMessage();
82   *         int newBufferSize = bufferSize.addAndGet(data.readableBytes());
83   *
84   *         // Flush the queue if it gets larger than 8KiB.
85   *         if (newBufferSize > 8192) {
86   *             flush();
87   *             bufferSize.set(0);
88   *         }
89   *     }
90   * }
91   * </pre>
92   *
93   * <h3>Consolidate on flush</h3>
94   *
95   * If there are two or more write requests in the queue and all their message
96   * type is {@link ChannelBuffer}, they can be merged into a single write request
97   * to save the number of system calls.
98   * <pre>
99   * BEFORE consolidation:            AFTER consolidation:
100  * +-------+-------+-------+        +-------------+
101  * | Req C | Req B | Req A |------\\| Request ABC |
102  * | "789" | "456" | "123" |------//| "123456789" |
103  * +-------+-------+-------+        +-------------+
104  * </pre>
105  * This feature is disabled by default.  You can override the default when you
106  * create this handler or call {@link #flush(boolean)}.  If you specified
107  * {@code true} when you call the constructor, calling {@link #flush()} will
108  * always consolidate the queue.  Otherwise, you have to call
109  * {@link #flush(boolean)} with {@code true} to enable this feature for each
110  * flush.
111  * <p>
112  * The disadvantage of consolidation is that the {@link ChannelFuture} and its
113  * {@link ChannelFutureListener}s associated with the original write requests
114  * might be notified later than when they are actually written out.  They will
115  * always be notified when the consolidated write request is fully written.
116  * <p>
117  * The following example implements the consolidation strategy that reduces
118  * the number of write requests based on the writability of a channel:
119  * <pre>
120  * public class ConsolidatingAutoFlusher extends {@link BufferedWriteHandler} {
121  *
122  *     public ConsolidatingAutoFlusher() {
123  *         // Enable consolidation by default.
124  *         super(true);
125  *     }
126  *
127  *     {@literal @Override}
128  *     public void channelOpen({@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
129  *         {@link ChannelConfig} cfg = e.getChannel().getConfig();
130  *         if (cfg instanceof {@link NioSocketChannelConfig}) {
131  *             // Lower the watermark to increase the chance of consolidation.
132  *             cfg.setWriteBufferLowWaterMark(0);
133  *         }
134  *         super.channelOpen(e);
135  *     }
136  *
137  *     {@literal @Override}
138  *     public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) throws Exception {
139  *         super.writeRequested(ctx, et);
140  *         if (e.getChannel().isWritable()) {
141  *             flush();
142  *         }
143  *     }
144  *
145  *     {@literal @Override}
146  *     public void channelInterestChanged(
147  *             {@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
148  *         if (e.getChannel().isWritable()) {
149  *             flush();
150  *         }
151  *     }
152  * }
153  * </pre>
154  *
155  * <h3>Prioritized Writes</h3>
156  *
157  * You can implement prioritized writes by specifying an unbounded priority
158  * queue in the constructor of this handler.  It will be required to design
159  * the proper strategy to determine how often {@link #flush()} should be called.
160  * For example, you could call {@link #flush()} periodically, using
161  * {@link HashedWheelTimer} every second.
162  * @apiviz.landmark
163  */
164 public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
165 
166     private final Queue<MessageEvent> queue;
167     private final boolean consolidateOnFlush;
168     private volatile ChannelHandlerContext ctx;
169     private final AtomicBoolean flush = new AtomicBoolean(false);
170 
171     /**
172      * Creates a new instance with the default unbounded {@link BlockingQueue}
173      * implementation and without buffer consolidation.
174      */
175     public BufferedWriteHandler() {
176         this(false);
177     }
178 
179     /**
180      * Creates a new instance with the specified thread-safe unbounded
181      * {@link Queue} and without buffer consolidation.  Please note that
182      * specifying a bounded {@link Queue} or a thread-unsafe {@link Queue} will
183      * result in an unspecified behavior.
184      */
185     public BufferedWriteHandler(Queue<MessageEvent> queue) {
186         this(queue, false);
187     }
188 
189     /**
190      * Creates a new instance with {@link ConcurrentLinkedQueue}
191      *
192      * @param consolidateOnFlush
193      *        {@code true} if and only if the buffered write requests are merged
194      *        into a single write request on {@link #flush()}
195      */
196     public BufferedWriteHandler(boolean consolidateOnFlush) {
197         this(new ConcurrentLinkedQueue<MessageEvent>(), consolidateOnFlush);
198     }
199 
200     /**
201      * Creates a new instance with the specified thread-safe unbounded
202      * {@link Queue}.  Please note that specifying a bounded {@link Queue} or
203      * a thread-unsafe {@link Queue} will result in an unspecified behavior.
204      *
205      * @param consolidateOnFlush
206      *        {@code true} if and only if the buffered write requests are merged
207      *        into a single write request on {@link #flush()}
208      */
209     public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210         if (queue == null) {
211             throw new NullPointerException("queue");
212         }
213         this.queue = queue;
214         this.consolidateOnFlush = consolidateOnFlush;
215     }
216 
217     public boolean isConsolidateOnFlush() {
218         return consolidateOnFlush;
219     }
220 
221     /**
222      * Returns the queue which stores the write requests.  The default
223      * implementation returns the queue which was specified in the constructor.
224      */
225     protected Queue<MessageEvent> getQueue() {
226         return queue;
227     }
228 
229     /**
230      * Sends the queued write requests to the downstream.
231      */
232     public void flush() {
233         flush(consolidateOnFlush);
234     }
235 
236     /**
237      * Sends the queued write requests to the downstream.
238      *
239      * @param consolidateOnFlush
240      *        {@code true} if and only if the buffered write requests are merged
241      *        into a single write request
242      */
243     public void flush(boolean consolidateOnFlush) {
244         final ChannelHandlerContext ctx = this.ctx;
245         if (ctx == null) {
246             // No write request was made.
247             return;
248         }
249         Channel channel = ctx.getChannel();
250         boolean acquired;
251 
252         // use CAS to see if the have flush already running, if so we don't need to take further actions
253         if (acquired = flush.compareAndSet(false, true)) {
254             final Queue<MessageEvent> queue = getQueue();
255             if (consolidateOnFlush) {
256                 if (queue.isEmpty()) {
257                     return;
258                 }
259 
260                 List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
261                 for (;;) {
262                     MessageEvent e = queue.poll();
263                     if (e == null) {
264                         break;
265                     }
266                     if (!(e.getMessage() instanceof ChannelBuffer)) {
267                         if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
268                             pendingWrites = new ArrayList<MessageEvent>();
269                         }
270                         ctx.sendDownstream(e);
271                     } else {
272                         pendingWrites.add(e);
273                     }
274                 }
275                 consolidatedWrite(pendingWrites);
276 
277             } else {
278                 for (;;) {
279                     MessageEvent e = queue.poll();
280                     if (e == null) {
281                         break;
282                     }
283                     ctx.sendDownstream(e);
284                 }
285             }
286         }
287 
288         if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
289             flush(consolidateOnFlush);
290         }
291     }
292 
293     private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
294         final int size = pendingWrites.size();
295         if (size == 1) {
296             ctx.sendDownstream(pendingWrites.remove(0));
297             return pendingWrites;
298         } else if (size == 0) {
299             return pendingWrites;
300         }
301 
302         ChannelBuffer[] data = new ChannelBuffer[size];
303         for (int i = 0; i < data.length; i ++) {
304             data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
305         }
306 
307         ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
308         ChannelFuture future = Channels.future(ctx.getChannel());
309         future.addListener(new ChannelFutureListener() {
310             public void operationComplete(ChannelFuture future)
311                     throws Exception {
312                 if (future.isSuccess()) {
313                     for (MessageEvent e: pendingWrites) {
314                         e.getFuture().setSuccess();
315                     }
316                 } else {
317                     Throwable cause = future.getCause();
318                     for (MessageEvent e: pendingWrites) {
319                         e.getFuture().setFailure(cause);
320                     }
321                 }
322             }
323         });
324 
325         Channels.write(ctx, future, composite);
326         return null;
327     }
328 
329     /**
330      * Stores all write requests to the queue so that they are actually written
331      * on {@link #flush()}.
332      */
333     @Override
334     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
335             throws Exception {
336         if (this.ctx == null) {
337             this.ctx = ctx;
338         } else {
339             assert this.ctx == ctx;
340         }
341 
342         getQueue().add(e);
343     }
344 
345     @Override
346     public void disconnectRequested(ChannelHandlerContext ctx,
347             ChannelStateEvent e) throws Exception {
348         try {
349             flush(consolidateOnFlush);
350         } finally {
351             ctx.sendDownstream(e);
352         }
353     }
354 
355     @Override
356     public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
357             throws Exception {
358         try {
359             flush(consolidateOnFlush);
360         } finally {
361             ctx.sendDownstream(e);
362         }
363     }
364 
365     /**
366      * Fail all buffered writes that are left. See
367      * <a href="https://github.com/netty/netty/issues/308>#308</a> for more details.
368      */
369     @Override
370     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
371         Throwable cause = null;
372         for (;;) {
373             MessageEvent ev = queue.poll();
374 
375             if (ev == null) {
376                 break;
377             }
378 
379             if (cause == null) {
380                 cause = new ClosedChannelException();
381             }
382             ev.getFuture().setFailure(cause);
383 
384         }
385         if (cause != null) {
386             Channels.fireExceptionCaught(ctx.getChannel(), cause);
387         }
388 
389         super.channelClosed(ctx, e);
390     }
391 
392     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
393         // Nothing to do
394 
395     }
396 
397     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
398         // Nothing to do
399 
400     }
401 
402     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
403         // flush a last time before remove the handler
404         flush(consolidateOnFlush);
405     }
406 
407     /**
408      * Fail all buffered writes that are left.
409      * See <a href="https://github.com/netty/netty/issues/308>#308</a> for more details.
410      */
411     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
412         Throwable cause = null;
413         for (;;) {
414             MessageEvent ev = queue.poll();
415 
416             if (ev == null) {
417                 break;
418             }
419 
420             if (cause == null) {
421                 cause = new IOException("Unable to flush message");
422             }
423             ev.getFuture().setFailure(cause);
424 
425         }
426         if (cause != null) {
427             Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
428         }
429     }
430 
431 
432 }