1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.ClosedChannelException;
19 import java.nio.channels.SelectableChannel;
20 import java.nio.channels.SelectionKey;
21 import java.nio.channels.SocketChannel;
22
23 import org.mortbay.io.Buffer;
24 import org.mortbay.io.Connection;
25 import org.mortbay.io.nio.SelectorManager.SelectSet;
26 import org.mortbay.jetty.EofException;
27 import org.mortbay.jetty.HttpException;
28 import org.mortbay.log.Log;
29 import org.mortbay.thread.Timeout;
30
31
32
33
34
35
36
37
38 public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable
39 {
40 protected SelectorManager _manager;
41 protected SelectorManager.SelectSet _selectSet;
42 protected boolean _dispatched = false;
43 protected boolean _writable = true;
44 protected SelectionKey _key;
45 protected int _interestOps;
46 protected boolean _readBlocked;
47 protected boolean _writeBlocked;
48 protected Connection _connection;
49
50 private Timeout.Task _timeoutTask = new IdleTask();
51
52
53 public Connection getConnection()
54 {
55 return _connection;
56 }
57
58
59 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
60 {
61 super(channel);
62
63 _manager = selectSet.getManager();
64 _selectSet = selectSet;
65 _connection = _manager.newConnection(channel,this);
66
67 _manager.endPointOpened(this);
68
69 _key = key;
70 }
71
72
73 void dispatch() throws IOException
74 {
75 boolean dispatch_done = true;
76 try
77 {
78 if (dispatch(_manager.isDelaySelectKeyUpdate()))
79 {
80 dispatch_done= false;
81 dispatch_done = _manager.dispatch((Runnable)this);
82 }
83 }
84 finally
85 {
86 if (!dispatch_done)
87 {
88 Log.warn("dispatch failed!");
89 undispatch();
90 }
91 }
92 }
93
94
95
96
97
98
99
100
101
102
103 public boolean dispatch(boolean assumeShortDispatch) throws IOException
104 {
105
106 synchronized (this)
107 {
108 if (_key == null || !_key.isValid())
109 {
110 _readBlocked=false;
111 _writeBlocked=false;
112 this.notifyAll();
113 return false;
114 }
115
116 if (_readBlocked || _writeBlocked)
117 {
118 if (_readBlocked && _key.isReadable())
119 _readBlocked=false;
120 if (_writeBlocked && _key.isWritable())
121 _writeBlocked=false;
122
123
124 this.notifyAll();
125
126
127 _key.interestOps(0);
128 return false;
129 }
130
131 if (!assumeShortDispatch)
132 _key.interestOps(0);
133
134
135 if (_dispatched)
136 {
137
138 _key.interestOps(0);
139 return false;
140 }
141
142
143 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
144 {
145
146 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
147 _key.interestOps(_interestOps);
148 _writable = true;
149 }
150
151 _dispatched = true;
152 }
153 return true;
154 }
155
156
157 public void scheduleIdle()
158 {
159 _selectSet.scheduleIdle(_timeoutTask);
160 }
161
162
163 public void cancelIdle()
164 {
165 _selectSet.cancelIdle(_timeoutTask);
166 }
167
168
169
170 protected void idleExpired()
171 {
172 try
173 {
174 close();
175 }
176 catch (IOException e)
177 {
178 Log.ignore(e);
179 }
180 }
181
182
183
184
185
186
187 public void undispatch()
188 {
189 synchronized (this)
190 {
191 try
192 {
193 _dispatched = false;
194 updateKey();
195 }
196 catch (Exception e)
197 {
198
199 Log.ignore(e);
200 _interestOps = -1;
201 _selectSet.addChange(this);
202 }
203 }
204 }
205
206
207
208
209 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
210 {
211 int l = super.flush(header, buffer, trailer);
212 _writable = l > 0;
213 return l;
214 }
215
216
217
218
219 public int flush(Buffer buffer) throws IOException
220 {
221 int l = super.flush(buffer);
222 _writable = l > 0;
223 return l;
224 }
225
226
227
228
229
230 public boolean blockReadable(long timeoutMs) throws IOException
231 {
232 synchronized (this)
233 {
234 long start=_selectSet.getNow();
235 try
236 {
237 _readBlocked=true;
238 while (isOpen() && _readBlocked)
239 {
240 try
241 {
242 updateKey();
243 this.wait(timeoutMs);
244
245 if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
246 return false;
247 }
248 catch (InterruptedException e)
249 {
250 Log.warn(e);
251 }
252 }
253 }
254 finally
255 {
256 _readBlocked=false;
257 }
258 }
259 return true;
260 }
261
262
263
264
265
266 public boolean blockWritable(long timeoutMs) throws IOException
267 {
268 synchronized (this)
269 {
270 long start=_selectSet.getNow();
271 try
272 {
273 _writeBlocked=true;
274 while (isOpen() && _writeBlocked)
275 {
276 try
277 {
278 updateKey();
279 this.wait(timeoutMs);
280
281 if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
282 return false;
283 }
284 catch (InterruptedException e)
285 {
286 Log.warn(e);
287 }
288 }
289 }
290 finally
291 {
292 _writeBlocked=false;
293 scheduleIdle();
294 }
295 }
296 return true;
297 }
298
299
300 public void setWritable(boolean writable)
301 {
302 _writable=writable;
303 }
304
305
306 public void scheduleWrite()
307 {
308 _writable=false;
309 updateKey();
310 }
311
312
313
314
315
316
317
318 private void updateKey()
319 {
320 synchronized (this)
321 {
322 int ops=-1;
323 if (getChannel().isOpen())
324 {
325 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
326 _interestOps =
327 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
328 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
329 }
330 if(_interestOps == ops && getChannel().isOpen())
331 return;
332
333 }
334 _selectSet.addChange(this);
335 _selectSet.wakeup();
336 }
337
338
339
340
341
342 void doUpdateKey()
343 {
344 synchronized (this)
345 {
346 if (getChannel().isOpen())
347 {
348 if (_interestOps>0)
349 {
350 if (_key==null || !_key.isValid())
351 {
352 SelectableChannel sc = (SelectableChannel)getChannel();
353 if (sc.isRegistered())
354 {
355 updateKey();
356 }
357 else
358 {
359 try
360 {
361 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
362 }
363 catch (Exception e)
364 {
365 Log.ignore(e);
366 if (_key!=null && _key.isValid())
367 {
368 _key.cancel();
369 }
370 cancelIdle();
371 _manager.endPointClosed(this);
372 _key = null;
373 }
374 }
375 }
376 else
377 {
378 _key.interestOps(_interestOps);
379 }
380 }
381 else
382 {
383 if (_key!=null && _key.isValid())
384 _key.interestOps(0);
385 else
386 _key=null;
387 }
388 }
389 else
390 {
391 if (_key!=null && _key.isValid())
392 {
393 _key.interestOps(0);
394 _key.cancel();
395 }
396 cancelIdle();
397 _manager.endPointClosed(this);
398 _key = null;
399 }
400 }
401 }
402
403
404
405
406 public void run()
407 {
408 try
409 {
410 _connection.handle();
411 }
412 catch (ClosedChannelException e)
413 {
414 Log.ignore(e);
415 }
416 catch (EofException e)
417 {
418 Log.debug("EOF", e);
419 try{close();}
420 catch(IOException e2){Log.ignore(e2);}
421 }
422 catch (HttpException e)
423 {
424 Log.debug("BAD", e);
425 try{close();}
426 catch(IOException e2){Log.ignore(e2);}
427 }
428 catch (Throwable e)
429 {
430 Log.warn("handle failed", e);
431 try{close();}
432 catch(IOException e2){Log.ignore(e2);}
433 }
434 finally
435 {
436 undispatch();
437 }
438 }
439
440
441
442
443
444 public void close() throws IOException
445 {
446 try
447 {
448 super.close();
449 }
450 catch (IOException e)
451 {
452 Log.ignore(e);
453 }
454 finally
455 {
456 updateKey();
457 }
458 }
459
460
461 public String toString()
462 {
463 return "SCEP@" + hashCode() + "[d=" + _dispatched + ",io=" + _interestOps + ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
464 }
465
466
467 public Timeout.Task getTimeoutTask()
468 {
469 return _timeoutTask;
470 }
471
472
473 public SelectSet getSelectSet()
474 {
475 return _selectSet;
476 }
477
478
479
480
481 public class IdleTask extends Timeout.Task
482 {
483
484
485
486
487 public void expired()
488 {
489 idleExpired();
490 }
491
492 public String toString()
493 {
494 return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
495 }
496
497 }
498
499 }