1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import java.io.IOException;
19 import java.lang.ref.SoftReference;
20 import java.net.SocketAddress;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.DatagramChannel;
23 import java.nio.channels.GatheringByteChannel;
24 import java.nio.channels.WritableByteChannel;
25
26 import org.jboss.netty.buffer.ChannelBuffer;
27 import org.jboss.netty.buffer.CompositeChannelBuffer;
28 import org.jboss.netty.channel.DefaultFileRegion;
29 import org.jboss.netty.channel.FileRegion;
30
31 final class SocketSendBufferPool {
32
33 private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
34
35 private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
36 private static final int ALIGN_SHIFT = 4;
37 private static final int ALIGN_MASK = 15;
38
39 PreallocationRef poolHead;
40 Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
41
42 SocketSendBufferPool() {
43 super();
44 }
45
46 SendBuffer acquire(Object message) {
47 if (message instanceof ChannelBuffer) {
48 return acquire((ChannelBuffer) message);
49 } else if (message instanceof FileRegion) {
50 return acquire((FileRegion) message);
51 }
52
53 throw new IllegalArgumentException(
54 "unsupported message type: " + message.getClass());
55 }
56
57 private SendBuffer acquire(FileRegion src) {
58 if (src.getCount() == 0) {
59 return EMPTY_BUFFER;
60 }
61 return new FileSendBuffer(src);
62 }
63
64 private SendBuffer acquire(ChannelBuffer src) {
65 final int size = src.readableBytes();
66 if (size == 0) {
67 return EMPTY_BUFFER;
68 }
69
70
71 if (src instanceof CompositeChannelBuffer && ((CompositeChannelBuffer) src).useGathering()) {
72 return new GatheringSendBuffer(src.toByteBuffers());
73 }
74
75 if (src.isDirect()) {
76 return new UnpooledSendBuffer(src.toByteBuffer());
77 }
78 if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
79 return new UnpooledSendBuffer(src.toByteBuffer());
80 }
81
82 Preallocation current = this.current;
83 ByteBuffer buffer = current.buffer;
84 int remaining = buffer.remaining();
85 PooledSendBuffer dst;
86
87 if (size < remaining) {
88 int nextPos = buffer.position() + size;
89 ByteBuffer slice = buffer.duplicate();
90 buffer.position(align(nextPos));
91 slice.limit(nextPos);
92 current.refCnt ++;
93 dst = new PooledSendBuffer(current, slice);
94 } else if (size > remaining) {
95 this.current = current = getPreallocation();
96 buffer = current.buffer;
97 ByteBuffer slice = buffer.duplicate();
98 buffer.position(align(size));
99 slice.limit(size);
100 current.refCnt ++;
101 dst = new PooledSendBuffer(current, slice);
102 } else {
103 current.refCnt ++;
104 this.current = getPreallocation0();
105 dst = new PooledSendBuffer(current, current.buffer);
106 }
107
108 ByteBuffer dstbuf = dst.buffer;
109 dstbuf.mark();
110 src.getBytes(src.readerIndex(), dstbuf);
111 dstbuf.reset();
112 return dst;
113 }
114
115 private Preallocation getPreallocation() {
116 Preallocation current = this.current;
117 if (current.refCnt == 0) {
118 current.buffer.clear();
119 return current;
120 }
121
122 return getPreallocation0();
123 }
124
125 private Preallocation getPreallocation0() {
126 PreallocationRef ref = poolHead;
127 if (ref != null) {
128 do {
129 Preallocation p = ref.get();
130 ref = ref.next;
131
132 if (p != null) {
133 poolHead = ref;
134 return p;
135 }
136 } while (ref != null);
137
138 poolHead = ref;
139 }
140
141 return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
142 }
143
144 private static int align(int pos) {
145 int q = pos >>> ALIGN_SHIFT;
146 int r = pos & ALIGN_MASK;
147 if (r != 0) {
148 q ++;
149 }
150 return q << ALIGN_SHIFT;
151 }
152
153 private final class Preallocation {
154 final ByteBuffer buffer;
155 int refCnt;
156
157 Preallocation(int capacity) {
158 buffer = ByteBuffer.allocateDirect(capacity);
159 }
160 }
161
162 private final class PreallocationRef extends SoftReference<Preallocation> {
163 final PreallocationRef next;
164
165 PreallocationRef(Preallocation prealloation, PreallocationRef next) {
166 super(prealloation);
167 this.next = next;
168 }
169 }
170
171 interface SendBuffer {
172 boolean finished();
173 long writtenBytes();
174 long totalBytes();
175
176 long transferTo(WritableByteChannel ch) throws IOException;
177 long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
178
179 void release();
180 }
181
182 class UnpooledSendBuffer implements SendBuffer {
183
184 final ByteBuffer buffer;
185 final int initialPos;
186
187 UnpooledSendBuffer(ByteBuffer buffer) {
188 this.buffer = buffer;
189 initialPos = buffer.position();
190 }
191
192 public final boolean finished() {
193 return !buffer.hasRemaining();
194 }
195
196 public final long writtenBytes() {
197 return buffer.position() - initialPos;
198 }
199
200 public final long totalBytes() {
201 return buffer.limit() - initialPos;
202 }
203
204 public final long transferTo(WritableByteChannel ch) throws IOException {
205 return ch.write(buffer);
206 }
207
208 public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
209 return ch.send(buffer, raddr);
210 }
211
212 public void release() {
213
214 }
215 }
216
217 final class PooledSendBuffer extends UnpooledSendBuffer {
218
219 private final Preallocation parent;
220
221 PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
222 super(buffer);
223 this.parent = parent;
224 }
225
226 @Override
227 public void release() {
228 final Preallocation parent = this.parent;
229 if (-- parent.refCnt == 0) {
230 parent.buffer.clear();
231 if (parent != current) {
232 poolHead = new PreallocationRef(parent, poolHead);
233 }
234 }
235 }
236 }
237
238 class GatheringSendBuffer implements SendBuffer {
239
240 private final ByteBuffer[] buffers;
241 private final int last;
242 private long written;
243 private final int total;
244
245 GatheringSendBuffer(ByteBuffer[] buffers) {
246 this.buffers = buffers;
247 last = buffers.length - 1;
248 int total = 0;
249 for (ByteBuffer buf: buffers) {
250 total += buf.remaining();
251 }
252 this.total = total;
253 }
254
255 public boolean finished() {
256 return !buffers[last].hasRemaining();
257 }
258
259 public long writtenBytes() {
260 return written;
261 }
262
263 public long totalBytes() {
264 return total;
265 }
266
267 public long transferTo(WritableByteChannel ch) throws IOException {
268 if (ch instanceof GatheringByteChannel) {
269 long w = ((GatheringByteChannel) ch).write(buffers);
270 written += w;
271 return w;
272 } else {
273 int send = 0;
274 for (ByteBuffer buf: buffers) {
275 if (buf.hasRemaining()) {
276 int w = ch.write(buf);
277 if (w == 0) {
278 break;
279 } else {
280 send += w;
281 }
282 }
283 }
284 written += send;
285 return send;
286 }
287 }
288
289 public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
290 int send = 0;
291 for (ByteBuffer buf: buffers) {
292 if (buf.hasRemaining()) {
293 int w = ch.send(buf, raddr);
294 if (w == 0) {
295 break;
296 } else {
297 send += w;
298 }
299 }
300 }
301 written += send;
302
303 return send;
304 }
305
306 public void release() {
307
308 }
309
310 }
311
312 final class FileSendBuffer implements SendBuffer {
313
314 private final FileRegion file;
315 private long writtenBytes;
316
317
318 FileSendBuffer(FileRegion file) {
319 this.file = file;
320 }
321
322 public boolean finished() {
323 return writtenBytes >= file.getCount();
324 }
325
326 public long writtenBytes() {
327 return writtenBytes;
328 }
329
330 public long totalBytes() {
331 return file.getCount();
332 }
333
334 public long transferTo(WritableByteChannel ch) throws IOException {
335 long localWrittenBytes = file.transferTo(ch, writtenBytes);
336 writtenBytes += localWrittenBytes;
337 return localWrittenBytes;
338 }
339
340 public long transferTo(DatagramChannel ch, SocketAddress raddr)
341 throws IOException {
342 throw new UnsupportedOperationException();
343 }
344
345 public void release() {
346 if (file instanceof DefaultFileRegion) {
347 if (((DefaultFileRegion) file).releaseAfterTransfer()) {
348
349
350 file.releaseExternalResources();
351 }
352 }
353 }
354 }
355
356 static final class EmptySendBuffer implements SendBuffer {
357
358 EmptySendBuffer() {
359 super();
360 }
361
362 public boolean finished() {
363 return true;
364 }
365
366 public long writtenBytes() {
367 return 0;
368 }
369
370 public long totalBytes() {
371 return 0;
372 }
373
374 public long transferTo(WritableByteChannel ch) throws IOException {
375 return 0;
376 }
377
378 public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
379 return 0;
380 }
381
382 public void release() {
383
384 }
385 }
386 }