1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.mortbay.io.nio;
17
18 import java.io.IOException;
19 import java.net.InetSocketAddress;
20 import java.net.Socket;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ByteChannel;
23 import java.nio.channels.GatheringByteChannel;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SocketChannel;
26
27 import org.mortbay.io.Buffer;
28 import org.mortbay.io.EndPoint;
29 import org.mortbay.io.Portable;
30 import org.mortbay.log.Log;
31
32
33
34
35
36
37
38
39 public class ChannelEndPoint implements EndPoint
40 {
41 protected final ByteChannel _channel;
42 protected final ByteBuffer[] _gather2=new ByteBuffer[2];
43 protected final Socket _socket;
44 protected final InetSocketAddress _local;
45 protected final InetSocketAddress _remote;
46
47
48
49
50 public ChannelEndPoint(ByteChannel channel)
51 {
52 super();
53 this._channel = channel;
54 if (channel instanceof SocketChannel)
55 {
56 _socket=((SocketChannel)channel).socket();
57 _local=(InetSocketAddress)_socket.getLocalSocketAddress();
58 _remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
59 }
60 else
61 {
62 _socket=null;
63 _local=null;
64 _remote=null;
65 }
66 }
67
68 public boolean isBlocking()
69 {
70 if (_channel instanceof SelectableChannel)
71 return ((SelectableChannel)_channel).isBlocking();
72 return true;
73 }
74
75 public boolean blockReadable(long millisecs) throws IOException
76 {
77 return true;
78 }
79
80 public boolean blockWritable(long millisecs) throws IOException
81 {
82 return true;
83 }
84
85
86
87
88 public boolean isOpen()
89 {
90 return _channel.isOpen();
91 }
92
93
94
95
96 public void shutdownOutput() throws IOException
97 {
98 if (_channel.isOpen() && _channel instanceof SocketChannel)
99 {
100 Socket socket= ((SocketChannel)_channel).socket();
101 if (!socket.isClosed()&&!socket.isOutputShutdown())
102 socket.shutdownOutput();
103 }
104 }
105
106
107
108
109 public void close() throws IOException
110 {
111 if (_socket!=null && !_socket.isOutputShutdown())
112 _socket.shutdownOutput();
113 _channel.close();
114 }
115
116
117
118
119 public int fill(Buffer buffer) throws IOException
120 {
121 Buffer buf = buffer.buffer();
122 int len=0;
123 if (buf instanceof NIOBuffer)
124 {
125 NIOBuffer nbuf = (NIOBuffer)buf;
126 ByteBuffer bbuf=nbuf.getByteBuffer();
127 synchronized(nbuf)
128 {
129 try
130 {
131 bbuf.position(buffer.putIndex());
132 len=_channel.read(bbuf);
133 if (len<0)
134 _channel.close();
135 }
136 finally
137 {
138 buffer.setPutIndex(bbuf.position());
139 bbuf.position(0);
140 }
141 }
142 }
143 else
144 {
145 throw new IOException("Not Implemented");
146 }
147
148 return len;
149 }
150
151
152
153
154 public int flush(Buffer buffer) throws IOException
155 {
156 Buffer buf = buffer.buffer();
157 int len=0;
158 if (buf instanceof NIOBuffer)
159 {
160 NIOBuffer nbuf = (NIOBuffer)buf;
161 ByteBuffer bbuf=nbuf.getByteBuffer();
162
163
164 synchronized(bbuf)
165 {
166 try
167 {
168 bbuf.position(buffer.getIndex());
169 bbuf.limit(buffer.putIndex());
170 len=_channel.write(bbuf);
171 }
172 finally
173 {
174 if (len>0)
175 buffer.skip(len);
176 bbuf.position(0);
177 bbuf.limit(bbuf.capacity());
178 }
179 }
180 }
181 else if (buffer.array()!=null)
182 {
183 ByteBuffer b = ByteBuffer.wrap(buffer.array(), buffer.getIndex(), buffer.length());
184 len=_channel.write(b);
185 if (len>0)
186 buffer.skip(len);
187 }
188 else
189 {
190 throw new IOException("Not Implemented");
191 }
192 return len;
193 }
194
195
196
197
198 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
199 {
200 int length=0;
201
202 Buffer buf0 = header==null?null:header.buffer();
203 Buffer buf1 = buffer==null?null:buffer.buffer();
204
205 if (_channel instanceof GatheringByteChannel &&
206 header!=null && header.length()!=0 && header instanceof NIOBuffer &&
207 buffer!=null && buffer.length()!=0 && buffer instanceof NIOBuffer)
208 {
209 NIOBuffer nbuf0 = (NIOBuffer)buf0;
210 ByteBuffer bbuf0=nbuf0.getByteBuffer();
211 NIOBuffer nbuf1 = (NIOBuffer)buf1;
212 ByteBuffer bbuf1=nbuf1.getByteBuffer();
213
214 synchronized(this)
215 {
216
217 synchronized(bbuf0)
218 {
219 synchronized(bbuf1)
220 {
221 try
222 {
223
224 bbuf0.position(header.getIndex());
225 bbuf0.limit(header.putIndex());
226 bbuf1.position(buffer.getIndex());
227 bbuf1.limit(buffer.putIndex());
228
229 _gather2[0]=bbuf0;
230 _gather2[1]=bbuf1;
231
232
233 length=(int)((GatheringByteChannel)_channel).write(_gather2);
234
235 int hl=header.length();
236 if (length>hl)
237 {
238 header.clear();
239 buffer.skip(length-hl);
240 }
241 else if (length>0)
242 {
243 header.skip(length);
244 }
245
246 }
247 finally
248 {
249
250 if (!header.isImmutable())
251 header.setGetIndex(bbuf0.position());
252 if (!buffer.isImmutable())
253 buffer.setGetIndex(bbuf1.position());
254
255 bbuf0.position(0);
256 bbuf1.position(0);
257 bbuf0.limit(bbuf0.capacity());
258 bbuf1.limit(bbuf1.capacity());
259 }
260 }
261 }
262 }
263 }
264 else
265 {
266 if (header!=null)
267 {
268 if (buffer!=null && buffer.length()>0 && header.space()>buffer.length())
269 {
270 header.put(buffer);
271 buffer.clear();
272 }
273 if (trailer!=null && trailer.length()>0 && header.space()>trailer.length())
274 {
275 header.put(trailer);
276 trailer.clear();
277 }
278 }
279
280
281 if (header!=null && header.length()>0)
282 length=flush(header);
283
284
285 if ((header==null || header.length()==0) &&
286 buffer!=null && buffer.length()>0)
287 length+=flush(buffer);
288
289
290 if ((header==null || header.length()==0) &&
291 (buffer==null || buffer.length()==0) &&
292 trailer!=null && trailer.length()>0)
293 length+=flush(trailer);
294 }
295
296 return length;
297 }
298
299
300
301
302 public ByteChannel getChannel()
303 {
304 return _channel;
305 }
306
307
308
309
310
311
312 public String getLocalAddr()
313 {
314 if (_socket==null)
315 return null;
316
317 if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
318 return Portable.ALL_INTERFACES;
319
320 return _local.getAddress().getHostAddress();
321 }
322
323
324
325
326
327 public String getLocalHost()
328 {
329 if (_socket==null)
330 return null;
331
332 if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
333 return Portable.ALL_INTERFACES;
334
335 return _local.getAddress().getCanonicalHostName();
336 }
337
338
339
340
341
342 public int getLocalPort()
343 {
344 if (_socket==null)
345 return 0;
346 if (_local==null)
347 return -1;
348 return _local.getPort();
349 }
350
351
352
353
354
355 public String getRemoteAddr()
356 {
357 if (_socket==null)
358 return null;
359
360 if (_remote==null)
361 return null;
362 return _remote.getAddress().getHostAddress();
363 }
364
365
366
367
368
369 public String getRemoteHost()
370 {
371 if (_socket==null)
372 return null;
373
374 if (_remote==null)
375 return null;
376 return _remote.getAddress().getCanonicalHostName();
377 }
378
379
380
381
382
383 public int getRemotePort()
384 {
385 if (_socket==null)
386 return 0;
387
388 if (_remote==null)
389 return -1;
390 return _remote==null?-1:_remote.getPort();
391 }
392
393
394
395
396
397 public Object getTransport()
398 {
399 return _channel;
400 }
401
402
403 public void flush()
404 throws IOException
405 {
406 }
407
408
409 public boolean isBufferingInput()
410 {
411 return false;
412 }
413
414
415 public boolean isBufferingOutput()
416 {
417 return false;
418 }
419
420
421 public boolean isBufferred()
422 {
423 return false;
424 }
425 }