1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import java.io.IOException;
19 import java.lang.ref.SoftReference;
20 import java.net.SocketAddress;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.DatagramChannel;
23 import java.nio.channels.WritableByteChannel;
24
25 import org.jboss.netty.buffer.ChannelBuffer;
26 import org.jboss.netty.channel.FileRegion;
27
28
29
30
31
32
33 final class SocketSendBufferPool {
34
35 private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
36
37 private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
38 private static final int ALIGN_SHIFT = 4;
39 private static final int ALIGN_MASK = 15;
40
41 PreallocationRef poolHead = null;
42 Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
43
44 SocketSendBufferPool() {
45 super();
46 }
47
48 final SendBuffer acquire(Object message) {
49 if (message instanceof ChannelBuffer) {
50 return acquire((ChannelBuffer) message);
51 } else if (message instanceof FileRegion) {
52 return acquire((FileRegion) message);
53 }
54
55 throw new IllegalArgumentException(
56 "unsupported message type: " + message.getClass());
57 }
58
59 private final SendBuffer acquire(FileRegion src) {
60 if (src.getCount() == 0) {
61 return EMPTY_BUFFER;
62 }
63 return new FileSendBuffer(src);
64 }
65
66 private final SendBuffer acquire(ChannelBuffer src) {
67 final int size = src.readableBytes();
68 if (size == 0) {
69 return EMPTY_BUFFER;
70 }
71
72 if (src.isDirect()) {
73 return new UnpooledSendBuffer(src.toByteBuffer());
74 }
75 if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
76 return new UnpooledSendBuffer(src.toByteBuffer());
77 }
78
79 Preallocation current = this.current;
80 ByteBuffer buffer = current.buffer;
81 int remaining = buffer.remaining();
82 PooledSendBuffer dst;
83
84 if (size < remaining) {
85 int nextPos = buffer.position() + size;
86 ByteBuffer slice = buffer.duplicate();
87 buffer.position(align(nextPos));
88 slice.limit(nextPos);
89 current.refCnt ++;
90 dst = new PooledSendBuffer(current, slice);
91 } else if (size > remaining) {
92 this.current = current = getPreallocation();
93 buffer = current.buffer;
94 ByteBuffer slice = buffer.duplicate();
95 buffer.position(align(size));
96 slice.limit(size);
97 current.refCnt ++;
98 dst = new PooledSendBuffer(current, slice);
99 } else {
100 current.refCnt ++;
101 this.current = getPreallocation0();
102 dst = new PooledSendBuffer(current, current.buffer);
103 }
104
105 ByteBuffer dstbuf = dst.buffer;
106 dstbuf.mark();
107 src.getBytes(src.readerIndex(), dstbuf);
108 dstbuf.reset();
109 return dst;
110 }
111
112 private final Preallocation getPreallocation() {
113 Preallocation current = this.current;
114 if (current.refCnt == 0) {
115 current.buffer.clear();
116 return current;
117 }
118
119 return getPreallocation0();
120 }
121
122 private final Preallocation getPreallocation0() {
123 PreallocationRef ref = poolHead;
124 if (ref != null) {
125 do {
126 Preallocation p = ref.get();
127 ref = ref.next;
128
129 if (p != null) {
130 poolHead = ref;
131 return p;
132 }
133 } while (ref != null);
134
135 poolHead = ref;
136 }
137
138 return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
139 }
140
141 private static final int align(int pos) {
142 int q = pos >>> ALIGN_SHIFT;
143 int r = pos & ALIGN_MASK;
144 if (r != 0) {
145 q ++;
146 }
147 return q << ALIGN_SHIFT;
148 }
149
150 private final class Preallocation {
151 final ByteBuffer buffer;
152 int refCnt;
153
154 Preallocation(int capacity) {
155 buffer = ByteBuffer.allocateDirect(capacity);
156 }
157 }
158
159 private final class PreallocationRef extends SoftReference<Preallocation> {
160 final PreallocationRef next;
161
162 PreallocationRef(Preallocation prealloation, PreallocationRef next) {
163 super(prealloation);
164 this.next = next;
165 }
166 }
167
168 interface SendBuffer {
169 boolean finished();
170 long writtenBytes();
171 long totalBytes();
172
173 long transferTo(WritableByteChannel ch) throws IOException;
174 long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
175
176 void release();
177 }
178
179 class UnpooledSendBuffer implements SendBuffer {
180
181 final ByteBuffer buffer;
182 final int initialPos;
183
184 UnpooledSendBuffer(ByteBuffer buffer) {
185 this.buffer = buffer;
186 initialPos = buffer.position();
187 }
188
189 public final boolean finished() {
190 return !buffer.hasRemaining();
191 }
192
193 public final long writtenBytes() {
194 return buffer.position() - initialPos;
195 }
196
197 public final long totalBytes() {
198 return buffer.limit() - initialPos;
199 }
200
201 public final long transferTo(WritableByteChannel ch) throws IOException {
202 return ch.write(buffer);
203 }
204
205 public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
206 return ch.send(buffer, raddr);
207 }
208
209 public void release() {
210
211 }
212 }
213
214 final class PooledSendBuffer implements SendBuffer {
215
216 private final Preallocation parent;
217 final ByteBuffer buffer;
218 final int initialPos;
219
220 PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
221 this.parent = parent;
222 this.buffer = buffer;
223 initialPos = buffer.position();
224 }
225
226 public boolean finished() {
227 return !buffer.hasRemaining();
228 }
229
230 public long writtenBytes() {
231 return buffer.position() - initialPos;
232 }
233
234 public long totalBytes() {
235 return buffer.limit() - initialPos;
236 }
237
238 public long transferTo(WritableByteChannel ch) throws IOException {
239 return ch.write(buffer);
240 }
241
242 public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
243 return ch.send(buffer, raddr);
244 }
245
246 public void release() {
247 final Preallocation parent = this.parent;
248 if (-- parent.refCnt == 0) {
249 parent.buffer.clear();
250 if (parent != current) {
251 poolHead = new PreallocationRef(parent, poolHead);
252 }
253 }
254 }
255 }
256
257 final class FileSendBuffer implements SendBuffer {
258
259 private final FileRegion file;
260 private long writtenBytes;
261
262
263 FileSendBuffer(FileRegion file) {
264 this.file = file;
265 }
266
267 public boolean finished() {
268 return writtenBytes >= file.getCount();
269 }
270
271 public long writtenBytes() {
272 return writtenBytes;
273 }
274
275 public long totalBytes() {
276 return file.getCount();
277 }
278
279 public long transferTo(WritableByteChannel ch) throws IOException {
280 long localWrittenBytes = file.transferTo(ch, writtenBytes);
281 writtenBytes += localWrittenBytes;
282 return localWrittenBytes;
283 }
284
285 public long transferTo(DatagramChannel ch, SocketAddress raddr)
286 throws IOException {
287 throw new UnsupportedOperationException();
288 }
289
290 public void release() {
291
292 }
293 }
294
295 static final class EmptySendBuffer implements SendBuffer {
296
297 EmptySendBuffer() {
298 super();
299 }
300
301 public final boolean finished() {
302 return true;
303 }
304
305 public final long writtenBytes() {
306 return 0;
307 }
308
309 public final long totalBytes() {
310 return 0;
311 }
312
313 public final long transferTo(WritableByteChannel ch) throws IOException {
314 return 0;
315 }
316
317 public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
318 return 0;
319 }
320
321 public void release() {
322
323 }
324 }
325 }