1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.buffer.ChannelBuffers.*;
19
20 import java.io.InputStream;
21 import java.io.PushbackInputStream;
22
23
24
25
26
27
28
29
30
31 public class ChunkedStream implements ChunkedInput {
32
33 static final int DEFAULT_CHUNK_SIZE = 8192;
34
35 private final PushbackInputStream in;
36 private final int chunkSize;
37 private volatile long offset;
38
39
40
41
42 public ChunkedStream(InputStream in) {
43 this(in, DEFAULT_CHUNK_SIZE);
44 }
45
46
47
48
49
50
51
52 public ChunkedStream(InputStream in, int chunkSize) {
53 if (in == null) {
54 throw new NullPointerException("in");
55 }
56 if (chunkSize <= 0) {
57 throw new IllegalArgumentException(
58 "chunkSize: " + chunkSize +
59 " (expected: a positive integer)");
60 }
61
62 if (in instanceof PushbackInputStream) {
63 this.in = (PushbackInputStream) in;
64 } else {
65 this.in = new PushbackInputStream(in);
66 }
67 this.chunkSize = chunkSize;
68 }
69
70
71
72
73 public long getTransferredBytes() {
74 return offset;
75 }
76
77 public boolean hasNextChunk() throws Exception {
78 int b = in.read();
79 if (b < 0) {
80 return false;
81 } else {
82 in.unread(b);
83 return true;
84 }
85 }
86
87 public boolean isEndOfInput() throws Exception {
88 return !hasNextChunk();
89 }
90
91 public void close() throws Exception {
92 in.close();
93 }
94
95 public Object nextChunk() throws Exception {
96 if (!hasNextChunk()) {
97 return null;
98 }
99
100 final int availableBytes = in.available();
101 final int chunkSize;
102 if (availableBytes <= 0) {
103 chunkSize = this.chunkSize;
104 } else {
105 chunkSize = Math.min(this.chunkSize, in.available());
106 }
107 final byte[] chunk = new byte[chunkSize];
108 int readBytes = 0;
109 for (;;) {
110 int localReadBytes = in.read(chunk, readBytes, chunkSize - readBytes);
111 if (localReadBytes < 0) {
112 break;
113 }
114 readBytes += localReadBytes;
115 offset += localReadBytes;
116
117 if (readBytes == chunkSize) {
118 break;
119 }
120 }
121
122 return wrappedBuffer(chunk, 0, readBytes);
123 }
124 }