View Javadoc

1   // ========================================================================
2   // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.File;
18  import java.io.FileInputStream;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.OutputStream;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.Channels;
24  import java.nio.channels.FileChannel;
25  import java.nio.channels.ReadableByteChannel;
26  import java.nio.channels.WritableByteChannel;
27  
28  import org.mortbay.io.AbstractBuffer;
29  import org.mortbay.io.Buffer;
30  
31  /* ------------------------------------------------------------------------------- */
32  /** 
33   * 
34   * @author gregw
35   */
36  public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
37  { 	
38      protected ByteBuffer _buf;
39      private ReadableByteChannel _in;
40      private InputStream _inStream;
41      private WritableByteChannel _out;
42      private OutputStream _outStream;
43  
44      public DirectNIOBuffer(int size)
45      {
46          super(READWRITE,NON_VOLATILE);
47          _buf = ByteBuffer.allocateDirect(size);
48          _buf.position(0);
49          _buf.limit(_buf.capacity());
50      }
51      
52      public DirectNIOBuffer(ByteBuffer buffer,boolean immutable)
53      {
54          super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE);
55          if (!buffer.isDirect())
56              throw new IllegalArgumentException();
57          _buf = buffer;
58          setGetIndex(buffer.position());
59          setPutIndex(buffer.limit());
60      }
61  
62      /**
63       * @param file
64       */
65      public DirectNIOBuffer(File file) throws IOException
66      {
67          super(READONLY,NON_VOLATILE);
68          FileInputStream fis = new FileInputStream(file);
69          FileChannel fc = fis.getChannel();
70          _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
71          setGetIndex(0);
72          setPutIndex((int)file.length());
73          _access=IMMUTABLE;
74      }
75  
76      /* ------------------------------------------------------------ */
77      public boolean isDirect()
78      {
79          return true;
80      }
81  
82      /* ------------------------------------------------------------ */
83      public byte[] array()
84      {
85          return null;
86      }
87  
88      /* ------------------------------------------------------------ */
89      public int capacity()
90      {
91          return _buf.capacity();
92      }
93  
94      /* ------------------------------------------------------------ */
95      public byte peek(int position)
96      {
97          return _buf.get(position);
98      }
99  
100     public int peek(int index, byte[] b, int offset, int length)
101     {
102         int l = length;
103         if (index+l > capacity())
104         {
105             l=capacity()-index;
106             if (l==0)
107                 return -1;
108         }
109         
110         if (l < 0) 
111             return -1;
112         try
113         {
114             _buf.position(index);
115             _buf.get(b,offset,l);
116         }
117         finally
118         {
119             _buf.position(0);
120         }
121         
122         return l;
123     }
124 
125     public void poke(int index, byte b)
126     {
127         if (isReadOnly()) throw new IllegalStateException(__READONLY);
128         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
129         if (index > capacity())
130                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
131         _buf.put(index,b);
132     }
133 
134     public int poke(int index, Buffer src)
135     {
136         if (isReadOnly()) throw new IllegalStateException(__READONLY);
137 
138         byte[] array=src.array();
139         if (array!=null)
140         {
141             int length = poke(index,array,src.getIndex(),src.length());
142             return length;
143         }
144         else
145         {
146             Buffer src_buf=src.buffer();
147             if (src_buf instanceof DirectNIOBuffer)
148             {
149                 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
150                 if (src_bytebuf==_buf)
151                     src_bytebuf=_buf.duplicate();
152                 try
153                 {   
154                     _buf.position(index);
155                     int space = _buf.remaining();
156                     
157                     int length=src.length();
158                     if (length>space)    
159                         length=space;
160                     
161                     src_bytebuf.position(src.getIndex());
162                     src_bytebuf.limit(src.getIndex()+length);
163                     
164                     _buf.put(src_bytebuf);
165                     return length;
166                 }
167                 finally
168                 {
169                     _buf.position(0);
170                     src_bytebuf.limit(src_bytebuf.capacity());
171                     src_bytebuf.position(0);
172                 }
173             }
174             else
175                 return super.poke(index,src);
176         }
177     }
178     
179     public int poke(int index, byte[] b, int offset, int length)
180     {
181         if (isReadOnly()) throw new IllegalStateException(__READONLY);
182 
183         if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
184 
185         if (index + length > capacity())
186         {
187             length=capacity()-index;
188             if (length<0)
189                 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
190         }
191 
192         try
193         {
194             _buf.position(index);
195             
196             int space=_buf.remaining();
197             
198             if (length>space)
199                 length=space;
200             if (length>0)
201                 _buf.put(b,offset,length);
202             return length;
203         }
204         finally
205         {
206             _buf.position(0);
207         }
208     }
209     
210     /* ------------------------------------------------------------ */
211     public ByteBuffer getByteBuffer()
212     {
213         return _buf;
214     }
215 
216     /* ------------------------------------------------------------ */
217     public int readFrom(InputStream in, int max) throws IOException
218     {
219         if (_in==null || !_in.isOpen() || in!=_inStream)
220         {
221             _in=Channels.newChannel(in);
222             _inStream=in;
223         }
224 
225         if (max<0 || max>space())
226             max=space();
227         int p = putIndex();
228         
229         try
230         {
231             int len=0, total=0, available=max;
232             int loop=0;
233             while (total<max) 
234             {
235                 _buf.position(p);
236                 _buf.limit(p+available);
237                 len=_in.read(_buf);
238                 if (len<0)
239                 {
240                     _in=null;
241                     _inStream=in;
242                     break;
243                 }
244                 else if (len>0)
245                 {
246                     p += len;
247                     total += len;
248                     available -= len;
249                     setPutIndex(p);
250                     loop=0;
251                 }
252                 else if (loop++>1)
253                     break;
254                 if (in.available()<=0)
255                     break;
256             }
257             if (len<0 && total==0)
258                 return -1;
259             return total;
260             
261         }
262         catch(IOException e)
263         {
264             _in=null;
265             _inStream=in;
266             throw e;
267         }
268         finally
269         {
270             if (_in!=null && !_in.isOpen())
271             {
272                 _in=null;
273                 _inStream=in;
274             }
275             _buf.position(0);
276             _buf.limit(_buf.capacity());
277         }
278     }
279 
280     /* ------------------------------------------------------------ */
281     public void writeTo(OutputStream out) throws IOException
282     {
283         if (_out==null || !_out.isOpen() || _out!=_outStream)
284         {
285             _out=Channels.newChannel(out);
286             _outStream=out;
287         }
288 
289         synchronized (_buf)
290         {
291             try
292             {
293                 int loop=0;
294                 while(hasContent() && _out.isOpen())
295                 {
296                     _buf.position(getIndex());
297                     _buf.limit(putIndex());
298                     int len=_out.write(_buf);
299                     if (len<0)
300                         break;
301                     else if (len>0)
302                     {
303                         skip(len);
304                         loop=0;
305                     }
306                     else if (loop++>1)
307                         break;
308                 }
309 
310             }
311             catch(IOException e)
312             {
313                 _out=null;
314                 _outStream=null;
315                 throw e;
316             }
317             finally
318             {
319                 if (_out!=null && !_out.isOpen())
320                 {
321                     _out=null;
322                     _outStream=null;
323                 }
324                 _buf.position(0);
325                 _buf.limit(_buf.capacity());
326             }
327         }
328     }
329 
330     
331     
332 }