View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.stream;
17  
18  import static org.jboss.netty.buffer.ChannelBuffers.*;
19  
20  import java.io.InputStream;
21  import java.io.PushbackInputStream;
22  
23  /**
24   * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
25   * chunk.
26   *
27   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
28   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
29   * @version $Rev: 2236 $, $Date: 2010-04-12 19:22:51 +0900 (Mon, 12 Apr 2010) $
30   */
31  public class ChunkedStream implements ChunkedInput {
32  
33      static final int DEFAULT_CHUNK_SIZE = 8192;
34  
35      private final PushbackInputStream in;
36      private final int chunkSize;
37      private volatile long offset;
38  
39      /**
40       * Creates a new instance that fetches data from the specified stream.
41       */
42      public ChunkedStream(InputStream in) {
43          this(in, DEFAULT_CHUNK_SIZE);
44      }
45  
46      /**
47       * Creates a new instance that fetches data from the specified stream.
48       *
49       * @param chunkSize the number of bytes to fetch on each
50       *                  {@link #nextChunk()} call
51       */
52      public ChunkedStream(InputStream in, int chunkSize) {
53          if (in == null) {
54              throw new NullPointerException("in");
55          }
56          if (chunkSize <= 0) {
57              throw new IllegalArgumentException(
58                      "chunkSize: " + chunkSize +
59                      " (expected: a positive integer)");
60          }
61  
62          if (in instanceof PushbackInputStream) {
63              this.in = (PushbackInputStream) in;
64          } else {
65              this.in = new PushbackInputStream(in);
66          }
67          this.chunkSize = chunkSize;
68      }
69  
70      /**
71       * Returns the number of transferred bytes.
72       */
73      public long getTransferredBytes() {
74          return offset;
75      }
76  
77      public boolean hasNextChunk() throws Exception {
78          int b = in.read();
79          if (b < 0) {
80              return false;
81          } else {
82              in.unread(b);
83              return true;
84          }
85      }
86  
87      public boolean isEndOfInput() throws Exception {
88          return !hasNextChunk();
89      }
90  
91      public void close() throws Exception {
92          in.close();
93      }
94  
95      public Object nextChunk() throws Exception {
96          if (!hasNextChunk()) {
97              return null;
98          }
99  
100         final int availableBytes = in.available();
101         final int chunkSize;
102         if (availableBytes <= 0) {
103             chunkSize = this.chunkSize;
104         } else {
105             chunkSize = Math.min(this.chunkSize, in.available());
106         }
107         final byte[] chunk = new byte[chunkSize];
108         int readBytes = 0;
109         for (;;) {
110             int localReadBytes = in.read(chunk, readBytes, chunkSize - readBytes);
111             if (localReadBytes < 0) {
112                 break;
113             }
114             readBytes += localReadBytes;
115             offset += localReadBytes;
116 
117             if (readBytes == chunkSize) {
118                 break;
119             }
120         }
121 
122         return wrappedBuffer(chunk, 0, readBytes);
123     }
124 }