1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import java.util.HashMap;
19 import java.util.LinkedHashMap;
20 import java.util.Map;
21 import java.util.NoSuchElementException;
22
23 import org.jboss.netty.logging.InternalLogger;
24 import org.jboss.netty.logging.InternalLoggerFactory;
25
26
27
28
29
30
31
32
33
34
35
36
37 public class DefaultChannelPipeline implements ChannelPipeline {
38
39 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
40 static final ChannelSink discardingSink = new DiscardingChannelSink();
41
42 private volatile Channel channel;
43 private volatile ChannelSink sink;
44 private volatile DefaultChannelHandlerContext head;
45 private volatile DefaultChannelHandlerContext tail;
46 private final Map<String, DefaultChannelHandlerContext> name2ctx =
47 new HashMap<String, DefaultChannelHandlerContext>(4);
48
49
50
51
52 public DefaultChannelPipeline() {
53 super();
54 }
55
56 public Channel getChannel() {
57 return channel;
58 }
59
60 public ChannelSink getSink() {
61 ChannelSink sink = this.sink;
62 if (sink == null) {
63 return discardingSink;
64 }
65 return sink;
66 }
67
68 public void attach(Channel channel, ChannelSink sink) {
69 if (channel == null) {
70 throw new NullPointerException("channel");
71 }
72 if (sink == null) {
73 throw new NullPointerException("sink");
74 }
75 if (this.channel != null || this.sink != null) {
76 throw new IllegalStateException("attached already");
77 }
78 this.channel = channel;
79 this.sink = sink;
80 }
81
82 public boolean isAttached() {
83 return sink != null;
84 }
85
86 public synchronized void addFirst(String name, ChannelHandler handler) {
87 if (name2ctx.isEmpty()) {
88 init(name, handler);
89 } else {
90 checkDuplicateName(name);
91 DefaultChannelHandlerContext oldHead = head;
92 DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
93
94 callBeforeAdd(newHead);
95
96 oldHead.prev = newHead;
97 head = newHead;
98 name2ctx.put(name, newHead);
99
100 callAfterAdd(newHead);
101 }
102 }
103
104 public synchronized void addLast(String name, ChannelHandler handler) {
105 if (name2ctx.isEmpty()) {
106 init(name, handler);
107 } else {
108 checkDuplicateName(name);
109 DefaultChannelHandlerContext oldTail = tail;
110 DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
111
112 callBeforeAdd(newTail);
113
114 oldTail.next = newTail;
115 tail = newTail;
116 name2ctx.put(name, newTail);
117
118 callAfterAdd(newTail);
119 }
120 }
121
122 public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
123 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
124 if (ctx == head) {
125 addFirst(name, handler);
126 } else {
127 checkDuplicateName(name);
128 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
129
130 callBeforeAdd(newCtx);
131
132 ctx.prev.next = newCtx;
133 ctx.prev = newCtx;
134 name2ctx.put(name, newCtx);
135
136 callAfterAdd(newCtx);
137 }
138 }
139
140 public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
141 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
142 if (ctx == tail) {
143 addLast(name, handler);
144 } else {
145 checkDuplicateName(name);
146 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
147
148 callBeforeAdd(newCtx);
149
150 ctx.next.prev = newCtx;
151 ctx.next = newCtx;
152 name2ctx.put(name, newCtx);
153
154 callAfterAdd(newCtx);
155 }
156 }
157
158 public synchronized void remove(ChannelHandler handler) {
159 remove(getContextOrDie(handler));
160 }
161
162 public synchronized ChannelHandler remove(String name) {
163 return remove(getContextOrDie(name)).getHandler();
164 }
165
166 @SuppressWarnings("unchecked")
167 public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
168 return (T) remove(getContextOrDie(handlerType)).getHandler();
169 }
170
171 private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
172 if (head == tail) {
173 head = tail = null;
174 name2ctx.clear();
175 } else if (ctx == head) {
176 removeFirst();
177 } else if (ctx == tail) {
178 removeLast();
179 } else {
180 callBeforeRemove(ctx);
181
182 DefaultChannelHandlerContext prev = ctx.prev;
183 DefaultChannelHandlerContext next = ctx.next;
184 prev.next = next;
185 next.prev = prev;
186 name2ctx.remove(ctx.getName());
187
188 callAfterRemove(ctx);
189 }
190 return ctx;
191 }
192
193 public synchronized ChannelHandler removeFirst() {
194 if (name2ctx.isEmpty()) {
195 throw new NoSuchElementException();
196 }
197
198 DefaultChannelHandlerContext oldHead = head;
199 if (oldHead == null) {
200 throw new NoSuchElementException();
201 }
202
203 callBeforeRemove(oldHead);
204
205 if (oldHead.next == null) {
206 head = tail = null;
207 name2ctx.clear();
208 } else {
209 oldHead.next.prev = null;
210 head = oldHead.next;
211 name2ctx.remove(oldHead.getName());
212 }
213
214 callAfterRemove(oldHead);
215
216 return oldHead.getHandler();
217 }
218
219 public synchronized ChannelHandler removeLast() {
220 if (name2ctx.isEmpty()) {
221 throw new NoSuchElementException();
222 }
223
224 DefaultChannelHandlerContext oldTail = tail;
225 if (oldTail == null) {
226 throw new NoSuchElementException();
227 }
228
229 callBeforeRemove(oldTail);
230
231 if (oldTail.prev == null) {
232 head = tail = null;
233 name2ctx.clear();
234 } else {
235 oldTail.prev.next = null;
236 tail = oldTail.prev;
237 name2ctx.remove(oldTail.getName());
238 }
239
240 callBeforeRemove(oldTail);
241
242 return oldTail.getHandler();
243 }
244
245 public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
246 replace(getContextOrDie(oldHandler), newName, newHandler);
247 }
248
249 public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
250 return replace(getContextOrDie(oldName), newName, newHandler);
251 }
252
253 @SuppressWarnings("unchecked")
254 public synchronized <T extends ChannelHandler> T replace(
255 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
256 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
257 }
258
259 private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
260 if (ctx == head) {
261 removeFirst();
262 addFirst(newName, newHandler);
263 } else if (ctx == tail) {
264 removeLast();
265 addLast(newName, newHandler);
266 } else {
267 boolean sameName = ctx.getName().equals(newName);
268 if (!sameName) {
269 checkDuplicateName(newName);
270 }
271
272 DefaultChannelHandlerContext prev = ctx.prev;
273 DefaultChannelHandlerContext next = ctx.next;
274 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
275
276 callBeforeRemove(ctx);
277 callBeforeAdd(newCtx);
278
279 prev.next = newCtx;
280 next.prev = newCtx;
281
282 if (!sameName) {
283 name2ctx.remove(ctx.getName());
284 name2ctx.put(newName, newCtx);
285 }
286
287 ChannelHandlerLifeCycleException removeException = null;
288 ChannelHandlerLifeCycleException addException = null;
289 boolean removed = false;
290 try {
291 callAfterRemove(ctx);
292 removed = true;
293 } catch (ChannelHandlerLifeCycleException e) {
294 removeException = e;
295 }
296
297 boolean added = false;
298 try {
299 callAfterAdd(newCtx);
300 added = true;
301 } catch (ChannelHandlerLifeCycleException e) {
302 addException = e;
303 }
304
305 if (!removed && !added) {
306 logger.warn(removeException.getMessage(), removeException);
307 logger.warn(addException.getMessage(), addException);
308 throw new ChannelHandlerLifeCycleException(
309 "Both " + ctx.getHandler().getClass().getName() +
310 ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
311 ".afterAdd() failed; see logs.");
312 } else if (!removed) {
313 throw removeException;
314 } else if (!added) {
315 throw addException;
316 }
317 }
318
319 return ctx.getHandler();
320 }
321
322 private void callBeforeAdd(ChannelHandlerContext ctx) {
323 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
324 return;
325 }
326
327 LifeCycleAwareChannelHandler h =
328 (LifeCycleAwareChannelHandler) ctx.getHandler();
329
330 try {
331 h.beforeAdd(ctx);
332 } catch (Throwable t) {
333 throw new ChannelHandlerLifeCycleException(
334 h.getClass().getName() +
335 ".beforeAdd() has thrown an exception; not adding.", t);
336 }
337 }
338
339 private void callAfterAdd(ChannelHandlerContext ctx) {
340 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
341 return;
342 }
343
344 LifeCycleAwareChannelHandler h =
345 (LifeCycleAwareChannelHandler) ctx.getHandler();
346
347 try {
348 h.afterAdd(ctx);
349 } catch (Throwable t) {
350 boolean removed = false;
351 try {
352 remove((DefaultChannelHandlerContext) ctx);
353 removed = true;
354 } catch (Throwable t2) {
355 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
356 }
357
358 if (removed) {
359 throw new ChannelHandlerLifeCycleException(
360 h.getClass().getName() +
361 ".afterAdd() has thrown an exception; removed.", t);
362 } else {
363 throw new ChannelHandlerLifeCycleException(
364 h.getClass().getName() +
365 ".afterAdd() has thrown an exception; also failed to remove.", t);
366 }
367 }
368 }
369
370 private void callBeforeRemove(ChannelHandlerContext ctx) {
371 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
372 return;
373 }
374
375 LifeCycleAwareChannelHandler h =
376 (LifeCycleAwareChannelHandler) ctx.getHandler();
377
378 try {
379 h.beforeRemove(ctx);
380 } catch (Throwable t) {
381 throw new ChannelHandlerLifeCycleException(
382 h.getClass().getName() +
383 ".beforeRemove() has thrown an exception; not removing.", t);
384 }
385 }
386
387 private void callAfterRemove(ChannelHandlerContext ctx) {
388 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
389 return;
390 }
391
392 LifeCycleAwareChannelHandler h =
393 (LifeCycleAwareChannelHandler) ctx.getHandler();
394
395 try {
396 h.afterRemove(ctx);
397 } catch (Throwable t) {
398 throw new ChannelHandlerLifeCycleException(
399 h.getClass().getName() +
400 ".afterRemove() has thrown an exception.", t);
401 }
402 }
403
404 public synchronized ChannelHandler getFirst() {
405 DefaultChannelHandlerContext head = this.head;
406 if (head == null) {
407 return null;
408 }
409 return head.getHandler();
410 }
411
412 public synchronized ChannelHandler getLast() {
413 DefaultChannelHandlerContext tail = this.tail;
414 if (tail == null) {
415 return null;
416 }
417 return tail.getHandler();
418 }
419
420 public synchronized ChannelHandler get(String name) {
421 DefaultChannelHandlerContext ctx = name2ctx.get(name);
422 if (ctx == null) {
423 return null;
424 } else {
425 return ctx.getHandler();
426 }
427 }
428
429 @SuppressWarnings("unchecked")
430 public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
431 ChannelHandlerContext ctx = getContext(handlerType);
432 if (ctx == null) {
433 return null;
434 } else {
435 return (T) ctx.getHandler();
436 }
437 }
438
439 public synchronized ChannelHandlerContext getContext(String name) {
440 if (name == null) {
441 throw new NullPointerException("name");
442 }
443 return name2ctx.get(name);
444 }
445
446 public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
447 if (handler == null) {
448 throw new NullPointerException("handler");
449 }
450 if (name2ctx.isEmpty()) {
451 return null;
452 }
453 DefaultChannelHandlerContext ctx = head;
454 for (;;) {
455 if (ctx.getHandler() == handler) {
456 return ctx;
457 }
458
459 ctx = ctx.next;
460 if (ctx == null) {
461 break;
462 }
463 }
464 return null;
465 }
466
467 public synchronized ChannelHandlerContext getContext(
468 Class<? extends ChannelHandler> handlerType) {
469 if (handlerType == null) {
470 throw new NullPointerException("handlerType");
471 }
472
473 if (name2ctx.isEmpty()) {
474 return null;
475 }
476 DefaultChannelHandlerContext ctx = head;
477 for (;;) {
478 if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
479 return ctx;
480 }
481
482 ctx = ctx.next;
483 if (ctx == null) {
484 break;
485 }
486 }
487 return null;
488 }
489
490 public Map<String, ChannelHandler> toMap() {
491 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
492 if (name2ctx.isEmpty()) {
493 return map;
494 }
495
496 DefaultChannelHandlerContext ctx = head;
497 for (;;) {
498 map.put(ctx.getName(), ctx.getHandler());
499 ctx = ctx.next;
500 if (ctx == null) {
501 break;
502 }
503 }
504 return map;
505 }
506
507
508
509
510 @Override
511 public String toString() {
512 StringBuilder buf = new StringBuilder();
513 buf.append(getClass().getSimpleName());
514 buf.append('{');
515 DefaultChannelHandlerContext ctx = head;
516 for (;;) {
517 buf.append('(');
518 buf.append(ctx.getName());
519 buf.append(" = ");
520 buf.append(ctx.getHandler().getClass().getName());
521 buf.append(')');
522 ctx = ctx.next;
523 if (ctx == null) {
524 break;
525 }
526 buf.append(", ");
527 }
528 buf.append('}');
529 return buf.toString();
530 }
531
532 public void sendUpstream(ChannelEvent e) {
533 DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
534 if (head == null) {
535 logger.warn(
536 "The pipeline contains no upstream handlers; discarding: " + e);
537 return;
538 }
539
540 sendUpstream(head, e);
541 }
542
543 void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
544 try {
545 ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
546 } catch (Throwable t) {
547 notifyHandlerException(e, t);
548 }
549 }
550
551 public void sendDownstream(ChannelEvent e) {
552 DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
553 if (tail == null) {
554 try {
555 getSink().eventSunk(this, e);
556 return;
557 } catch (Throwable t) {
558 notifyHandlerException(e, t);
559 return;
560 }
561 }
562
563 sendDownstream(tail, e);
564 }
565
566 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
567 try {
568 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
569 } catch (Throwable t) {
570 notifyHandlerException(e, t);
571 }
572 }
573
574 DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
575 if (ctx == null) {
576 return null;
577 }
578
579 DefaultChannelHandlerContext realCtx = ctx;
580 while (!realCtx.canHandleUpstream()) {
581 realCtx = realCtx.next;
582 if (realCtx == null) {
583 return null;
584 }
585 }
586
587 return realCtx;
588 }
589
590 DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
591 if (ctx == null) {
592 return null;
593 }
594
595 DefaultChannelHandlerContext realCtx = ctx;
596 while (!realCtx.canHandleDownstream()) {
597 realCtx = realCtx.prev;
598 if (realCtx == null) {
599 return null;
600 }
601 }
602
603 return realCtx;
604 }
605
606 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
607 if (e instanceof ExceptionEvent) {
608 logger.warn(
609 "An exception was thrown by a user handler " +
610 "while handling an exception event (" + e + ")", t);
611 return;
612 }
613
614 ChannelPipelineException pe;
615 if (t instanceof ChannelPipelineException) {
616 pe = (ChannelPipelineException) t;
617 } else {
618 pe = new ChannelPipelineException(t);
619 }
620
621 try {
622 sink.exceptionCaught(this, e, pe);
623 } catch (Exception e1) {
624 logger.warn("An exception was thrown by an exception handler.", e1);
625 }
626 }
627
628 private void init(String name, ChannelHandler handler) {
629 DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
630 callBeforeAdd(ctx);
631 head = tail = ctx;
632 name2ctx.clear();
633 name2ctx.put(name, ctx);
634 callAfterAdd(ctx);
635 }
636
637 private void checkDuplicateName(String name) {
638 if (name2ctx.containsKey(name)) {
639 throw new IllegalArgumentException("Duplicate handler name.");
640 }
641 }
642
643 private DefaultChannelHandlerContext getContextOrDie(String name) {
644 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
645 if (ctx == null) {
646 throw new NoSuchElementException(name);
647 } else {
648 return ctx;
649 }
650 }
651
652 private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
653 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
654 if (ctx == null) {
655 throw new NoSuchElementException(handler.getClass().getName());
656 } else {
657 return ctx;
658 }
659 }
660
661 private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
662 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
663 if (ctx == null) {
664 throw new NoSuchElementException(handlerType.getName());
665 } else {
666 return ctx;
667 }
668 }
669
670 private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
671 volatile DefaultChannelHandlerContext next;
672 volatile DefaultChannelHandlerContext prev;
673 private final String name;
674 private final ChannelHandler handler;
675 private final boolean canHandleUpstream;
676 private final boolean canHandleDownstream;
677 private volatile Object attachment;
678
679 DefaultChannelHandlerContext(
680 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
681 String name, ChannelHandler handler) {
682
683 if (name == null) {
684 throw new NullPointerException("name");
685 }
686 if (handler == null) {
687 throw new NullPointerException("handler");
688 }
689 canHandleUpstream = handler instanceof ChannelUpstreamHandler;
690 canHandleDownstream = handler instanceof ChannelDownstreamHandler;
691
692
693 if (!canHandleUpstream && !canHandleDownstream) {
694 throw new IllegalArgumentException(
695 "handler must be either " +
696 ChannelUpstreamHandler.class.getName() + " or " +
697 ChannelDownstreamHandler.class.getName() + '.');
698 }
699
700 this.prev = prev;
701 this.next = next;
702 this.name = name;
703 this.handler = handler;
704 }
705
706 public Channel getChannel() {
707 return getPipeline().getChannel();
708 }
709
710 public ChannelPipeline getPipeline() {
711 return DefaultChannelPipeline.this;
712 }
713
714 public boolean canHandleDownstream() {
715 return canHandleDownstream;
716 }
717
718 public boolean canHandleUpstream() {
719 return canHandleUpstream;
720 }
721
722 public ChannelHandler getHandler() {
723 return handler;
724 }
725
726 public String getName() {
727 return name;
728 }
729
730 public Object getAttachment() {
731 return attachment;
732 }
733
734 public void setAttachment(Object attachment) {
735 this.attachment = attachment;
736 }
737
738 public void sendDownstream(ChannelEvent e) {
739 DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
740 if (prev == null) {
741 try {
742 getSink().eventSunk(DefaultChannelPipeline.this, e);
743 } catch (Throwable t) {
744 notifyHandlerException(e, t);
745 }
746 } else {
747 DefaultChannelPipeline.this.sendDownstream(prev, e);
748 }
749 }
750
751 public void sendUpstream(ChannelEvent e) {
752 DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
753 if (next != null) {
754 DefaultChannelPipeline.this.sendUpstream(next, e);
755 }
756 }
757 }
758
759 private static final class DiscardingChannelSink implements ChannelSink {
760 DiscardingChannelSink() {
761 super();
762 }
763
764 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
765 logger.warn("Not attached yet; discarding: " + e);
766 }
767
768 public void exceptionCaught(ChannelPipeline pipeline,
769 ChannelEvent e, ChannelPipelineException cause) throws Exception {
770 throw cause;
771 }
772 }
773 }