1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.io.nio;
16
17 import java.io.IOException;
18 import java.nio.channels.ClosedChannelException;
19 import java.nio.channels.SelectableChannel;
20 import java.nio.channels.SelectionKey;
21 import java.nio.channels.SocketChannel;
22
23 import org.mortbay.io.Buffer;
24 import org.mortbay.io.Connection;
25 import org.mortbay.io.nio.SelectorManager.SelectSet;
26 import org.mortbay.jetty.EofException;
27 import org.mortbay.jetty.HttpException;
28 import org.mortbay.log.Log;
29 import org.mortbay.thread.Timeout;
30
31
32
33
34
35
36
37
38 public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable
39 {
40 protected SelectorManager _manager;
41 protected SelectorManager.SelectSet _selectSet;
42 protected boolean _dispatched = false;
43 protected boolean _writable = true;
44 protected SelectionKey _key;
45 protected int _interestOps;
46 protected boolean _readBlocked;
47 protected boolean _writeBlocked;
48 protected Connection _connection;
49
50 private Timeout.Task _timeoutTask = new IdleTask();
51
52
53 public Connection getConnection()
54 {
55 return _connection;
56 }
57
58
59 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
60 {
61 super(channel);
62
63 _manager = selectSet.getManager();
64 _selectSet = selectSet;
65 _connection = _manager.newConnection(channel,this);
66
67 _manager.endPointOpened(this);
68
69 _key = key;
70 }
71
72
73 void dispatch() throws IOException
74 {
75 boolean dispatch_done = true;
76 try
77 {
78 if (dispatch(_manager.isDelaySelectKeyUpdate()))
79 {
80 dispatch_done= false;
81 dispatch_done = _manager.dispatch((Runnable)this);
82 }
83 }
84 finally
85 {
86 if (!dispatch_done)
87 {
88 Log.warn("dispatch failed!");
89 undispatch();
90 }
91 }
92 }
93
94
95
96
97
98
99
100
101
102
103 public boolean dispatch(boolean assumeShortDispatch) throws IOException
104 {
105
106 synchronized (this)
107 {
108 if (_key == null || !_key.isValid())
109 {
110 _readBlocked=false;
111 _writeBlocked=false;
112 this.notifyAll();
113 return false;
114 }
115
116 if (_readBlocked || _writeBlocked)
117 {
118 if (_readBlocked && _key.isReadable())
119 _readBlocked=false;
120 if (_writeBlocked && _key.isWritable())
121 _writeBlocked=false;
122
123
124 this.notifyAll();
125
126
127 _key.interestOps(0);
128 return false;
129 }
130
131 if (!assumeShortDispatch)
132 _key.interestOps(0);
133
134
135 if (_dispatched)
136 {
137
138 _key.interestOps(0);
139 return false;
140 }
141
142
143 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
144 {
145
146 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
147 _key.interestOps(_interestOps);
148 _writable = true;
149 }
150
151 _dispatched = true;
152 }
153 return true;
154 }
155
156
157 public void scheduleIdle()
158 {
159 _selectSet.scheduleIdle(_timeoutTask);
160 }
161
162
163 public void cancelIdle()
164 {
165 _selectSet.cancelIdle(_timeoutTask);
166 }
167
168
169
170 protected void idleExpired()
171 {
172 try
173 {
174 close();
175 }
176 catch (IOException e)
177 {
178 Log.ignore(e);
179 }
180 }
181
182
183
184
185
186
187 public void undispatch()
188 {
189 synchronized (this)
190 {
191 try
192 {
193 _dispatched = false;
194 updateKey();
195 }
196 catch (Exception e)
197 {
198
199 Log.ignore(e);
200 _interestOps = -1;
201 _selectSet.addChange(this);
202 }
203 }
204 }
205
206
207
208
209 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
210 {
211 int l = super.flush(header, buffer, trailer);
212 _writable = l > 0;
213 return l;
214 }
215
216
217
218
219 public int flush(Buffer buffer) throws IOException
220 {
221 int l = super.flush(buffer);
222 _writable = l > 0;
223 return l;
224 }
225
226
227
228
229
230 public boolean blockReadable(long timeoutMs) throws IOException
231 {
232 synchronized (this)
233 {
234 long start=_selectSet.getNow();
235 try
236 {
237 _readBlocked=true;
238 while (isOpen() && _readBlocked)
239 {
240 try
241 {
242 updateKey();
243 this.wait(timeoutMs);
244
245 if (_readBlocked && timeoutMs<(_selectSet.getNow()-start))
246 return false;
247 }
248 catch (InterruptedException e)
249 {
250 Log.warn(e);
251 }
252 }
253 }
254 finally
255 {
256 _readBlocked=false;
257 }
258 }
259 return true;
260 }
261
262
263
264
265
266 public boolean blockWritable(long timeoutMs) throws IOException
267 {
268 synchronized (this)
269 {
270 long start=_selectSet.getNow();
271 try
272 {
273 _writeBlocked=true;
274 while (isOpen() && _writeBlocked)
275 {
276 try
277 {
278 updateKey();
279 this.wait(timeoutMs);
280
281 if (_writeBlocked && timeoutMs<(_selectSet.getNow()-start))
282 return false;
283 }
284 catch (InterruptedException e)
285 {
286 Log.warn(e);
287 }
288 }
289 }
290 finally
291 {
292 _writeBlocked=false;
293 }
294 }
295 return true;
296 }
297
298
299 public void setWritable(boolean writable)
300 {
301 _writable=writable;
302 }
303
304
305 public void scheduleWrite()
306 {
307 _writable=false;
308 updateKey();
309 }
310
311
312
313
314
315
316
317 private void updateKey()
318 {
319 synchronized (this)
320 {
321 int ops=-1;
322 if (getChannel().isOpen())
323 {
324 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
325 _interestOps =
326 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
327 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
328 }
329 if(_interestOps == ops && getChannel().isOpen())
330 return;
331
332 }
333 _selectSet.addChange(this);
334 _selectSet.wakeup();
335 }
336
337
338
339
340
341 void doUpdateKey()
342 {
343 synchronized (this)
344 {
345 if (getChannel().isOpen())
346 {
347 if (_interestOps>0)
348 {
349 if (_key==null || !_key.isValid())
350 {
351 SelectableChannel sc = (SelectableChannel)getChannel();
352 if (sc.isRegistered())
353 {
354 updateKey();
355 }
356 else
357 {
358 try
359 {
360 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
361 }
362 catch (Exception e)
363 {
364 Log.ignore(e);
365 if (_key!=null && _key.isValid())
366 {
367 _key.cancel();
368 }
369 cancelIdle();
370 _manager.endPointClosed(this);
371 _key = null;
372 }
373 }
374 }
375 else
376 {
377 _key.interestOps(_interestOps);
378 }
379 }
380 else
381 {
382 if (_key.isValid())
383 _key.interestOps(0);
384 else
385 _key=null;
386 }
387 }
388 else
389 {
390 if (_key!=null && _key.isValid())
391 {
392 _key.interestOps(0);
393 _key.cancel();
394 }
395 cancelIdle();
396 _manager.endPointClosed(this);
397 _key = null;
398 }
399 }
400 }
401
402
403
404
405 public void run()
406 {
407 try
408 {
409 _connection.handle();
410 }
411 catch (ClosedChannelException e)
412 {
413 Log.ignore(e);
414 }
415 catch (EofException e)
416 {
417 Log.debug("EOF", e);
418 try{close();}
419 catch(IOException e2){Log.ignore(e2);}
420 }
421 catch (HttpException e)
422 {
423 Log.debug("BAD", e);
424 try{close();}
425 catch(IOException e2){Log.ignore(e2);}
426 }
427 catch (Throwable e)
428 {
429 Log.warn("handle failed", e);
430 try{close();}
431 catch(IOException e2){Log.ignore(e2);}
432 }
433 finally
434 {
435 undispatch();
436 }
437 }
438
439
440
441
442
443 public void close() throws IOException
444 {
445 try
446 {
447 super.close();
448 }
449 catch (IOException e)
450 {
451 Log.ignore(e);
452 }
453 finally
454 {
455 updateKey();
456 }
457 }
458
459
460 public String toString()
461 {
462 return "SCEP@" + hashCode() + "[d=" + _dispatched + ",io=" + _interestOps + ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
463 }
464
465
466 public Timeout.Task getTimeoutTask()
467 {
468 return _timeoutTask;
469 }
470
471
472 public SelectSet getSelectSet()
473 {
474 return _selectSet;
475 }
476
477
478
479
480 public class IdleTask extends Timeout.Task
481 {
482
483
484
485
486 public void expire()
487 {
488 idleExpired();
489 }
490
491 public String toString()
492 {
493 return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
494 }
495
496 }
497
498 }