View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import java.util.HashMap;
19  import java.util.LinkedHashMap;
20  import java.util.Map;
21  
22  import org.jboss.netty.logging.InternalLogger;
23  import org.jboss.netty.logging.InternalLoggerFactory;
24  import org.jboss.netty.util.internal.ConversionUtil;
25  
26  /**
27   * A {@link ChannelPipeline} that might perform better at the cost of
28   * disabled dynamic insertion and removal of {@link ChannelHandler}s.
29   * An attempt to insert, remove, or replace a handler in this pipeline will
30   * trigger an {@link UnsupportedOperationException}.
31   *
32   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
33   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
34   *
35   * @version $Rev: 2267 $, $Date: 2010-05-06 16:00:52 +0900 (Thu, 06 May 2010) $
36   *
37   */
38  public class StaticChannelPipeline implements ChannelPipeline {
39  
40      // FIXME Code duplication with DefaultChannelPipeline
41      static final InternalLogger logger = InternalLoggerFactory.getInstance(StaticChannelPipeline.class);
42  
43      private volatile Channel channel;
44      private volatile ChannelSink sink;
45      private final StaticChannelHandlerContext[] contexts;
46      private final int lastIndex;
47      private final Map<String, StaticChannelHandlerContext> name2ctx =
48          new HashMap<String, StaticChannelHandlerContext>(4);
49  
50      /**
51       * Creates a new pipeline from the specified handlers.
52       * The names of the specified handlers are generated automatically;
53       * the first handler's name is {@code "0"}, the second handler's name is
54       * {@code "1"}, the third handler's name is {@code "2"}, and so on.
55       */
56      public StaticChannelPipeline(ChannelHandler... handlers) {
57          if (handlers == null) {
58              throw new NullPointerException("handlers");
59          }
60          if (handlers.length == 0) {
61              throw new IllegalArgumentException("no handlers specified");
62          }
63  
64          // Get the number of first non-null handlers.
65          StaticChannelHandlerContext[] contexts =
66              new StaticChannelHandlerContext[handlers.length];
67          int nContexts;
68          for (nContexts = 0; nContexts < contexts.length; nContexts ++) {
69              ChannelHandler h = handlers[nContexts];
70              if (h == null) {
71                  break;
72              }
73          }
74  
75          if (nContexts == contexts.length) {
76              this.contexts = contexts;
77              lastIndex = contexts.length - 1;
78          } else {
79              this.contexts = contexts =
80                  new StaticChannelHandlerContext[nContexts];
81              lastIndex = nContexts - 1;
82          }
83  
84          // Initialize the first non-null handlers only.
85          for (int i = 0; i < nContexts; i ++) {
86              ChannelHandler h = handlers[i];
87              String name = ConversionUtil.toString(i);
88              StaticChannelHandlerContext ctx =
89                  new StaticChannelHandlerContext(i, name, h);
90              contexts[i] = ctx;
91              name2ctx.put(name, ctx);
92          }
93  
94          for (ChannelHandlerContext ctx: contexts) {
95              callBeforeAdd(ctx);
96              callAfterAdd(ctx);
97          }
98      }
99  
100     public Channel getChannel() {
101         return channel;
102     }
103 
104     public ChannelSink getSink() {
105         ChannelSink sink = this.sink;
106         if (sink == null) {
107             return DefaultChannelPipeline.discardingSink;
108         }
109         return sink;
110     }
111 
112     public void attach(Channel channel, ChannelSink sink) {
113         if (channel == null) {
114             throw new NullPointerException("channel");
115         }
116         if (sink == null) {
117             throw new NullPointerException("sink");
118         }
119         if (this.channel != null || this.sink != null) {
120             throw new IllegalStateException("attached already");
121         }
122         this.channel = channel;
123         this.sink = sink;
124     }
125 
126     public boolean isAttached() {
127         return sink != null;
128     }
129 
130     public void addFirst(String name, ChannelHandler handler) {
131         throw new UnsupportedOperationException();
132     }
133 
134     public void addLast(String name, ChannelHandler handler) {
135         throw new UnsupportedOperationException();
136     }
137 
138     public void addBefore(String baseName, String name, ChannelHandler handler) {
139         throw new UnsupportedOperationException();
140     }
141 
142     public void addAfter(String baseName, String name, ChannelHandler handler) {
143         throw new UnsupportedOperationException();
144     }
145 
146     public void remove(ChannelHandler handler) {
147         throw new UnsupportedOperationException();
148     }
149 
150     public ChannelHandler remove(String name) {
151         throw new UnsupportedOperationException();
152     }
153 
154     public <T extends ChannelHandler> T remove(Class<T> handlerType) {
155         throw new UnsupportedOperationException();
156     }
157 
158     public ChannelHandler removeFirst() {
159         throw new UnsupportedOperationException();
160     }
161 
162     public ChannelHandler removeLast() {
163         throw new UnsupportedOperationException();
164     }
165 
166     public void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
167         throw new UnsupportedOperationException();
168     }
169 
170     public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
171         throw new UnsupportedOperationException();
172     }
173 
174     public <T extends ChannelHandler> T replace(
175             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
176         throw new UnsupportedOperationException();
177     }
178 
179     private void callBeforeAdd(ChannelHandlerContext ctx) {
180         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
181             return;
182         }
183 
184         LifeCycleAwareChannelHandler h =
185             (LifeCycleAwareChannelHandler) ctx.getHandler();
186 
187         try {
188             h.beforeAdd(ctx);
189         } catch (Throwable t) {
190             throw new ChannelHandlerLifeCycleException(
191                     h.getClass().getName() +
192                     ".beforeAdd() has thrown an exception; not adding.", t);
193         }
194     }
195 
196     private void callAfterAdd(ChannelHandlerContext ctx) {
197         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
198             return;
199         }
200 
201         LifeCycleAwareChannelHandler h =
202             (LifeCycleAwareChannelHandler) ctx.getHandler();
203 
204         try {
205             h.afterAdd(ctx);
206         } catch (Throwable t) {
207             boolean removed = false;
208             try {
209                 callBeforeRemove(ctx);
210                 callAfterRemove(ctx);
211                 removed = true;
212             } catch (Throwable t2) {
213                 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
214             }
215 
216             if (removed) {
217                 throw new ChannelHandlerLifeCycleException(
218                         h.getClass().getName() +
219                         ".afterAdd() has thrown an exception; removed.", t);
220             } else {
221                 throw new ChannelHandlerLifeCycleException(
222                         h.getClass().getName() +
223                         ".afterAdd() has thrown an exception; also failed to remove.", t);
224             }
225         }
226     }
227 
228     private void callBeforeRemove(ChannelHandlerContext ctx) {
229         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
230             return;
231         }
232 
233         LifeCycleAwareChannelHandler h =
234             (LifeCycleAwareChannelHandler) ctx.getHandler();
235 
236         try {
237             h.beforeRemove(ctx);
238         } catch (Throwable t) {
239             throw new ChannelHandlerLifeCycleException(
240                     h.getClass().getName() +
241                     ".beforeRemove() has thrown an exception; not removing.", t);
242         }
243     }
244 
245     private void callAfterRemove(ChannelHandlerContext ctx) {
246         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
247             return;
248         }
249 
250         LifeCycleAwareChannelHandler h =
251             (LifeCycleAwareChannelHandler) ctx.getHandler();
252 
253         try {
254             h.afterRemove(ctx);
255         } catch (Throwable t) {
256             throw new ChannelHandlerLifeCycleException(
257                     h.getClass().getName() +
258                     ".afterRemove() has thrown an exception.", t);
259         }
260     }
261 
262     public ChannelHandler getFirst() {
263         return contexts[0].getHandler();
264     }
265 
266     public ChannelHandler getLast() {
267         return contexts[contexts.length - 1].getHandler();
268     }
269 
270     public ChannelHandler get(String name) {
271         StaticChannelHandlerContext ctx = name2ctx.get(name);
272         if (ctx == null) {
273             return null;
274         } else {
275             return ctx.getHandler();
276         }
277     }
278 
279     @SuppressWarnings("unchecked")
280     public <T extends ChannelHandler> T get(Class<T> handlerType) {
281         ChannelHandlerContext ctx = getContext(handlerType);
282         if (ctx == null) {
283             return null;
284         } else {
285             return (T) ctx.getHandler();
286         }
287     }
288 
289     public ChannelHandlerContext getContext(String name) {
290         if (name == null) {
291             throw new NullPointerException("name");
292         }
293         return name2ctx.get(name);
294     }
295 
296     public ChannelHandlerContext getContext(ChannelHandler handler) {
297         if (handler == null) {
298             throw new NullPointerException("handler");
299         }
300         for (StaticChannelHandlerContext ctx: contexts) {
301             if (ctx.getHandler() == handler) {
302                 return ctx;
303             }
304         }
305         return null;
306     }
307 
308     public ChannelHandlerContext getContext(Class<? extends ChannelHandler> handlerType) {
309         if (handlerType == null) {
310             throw new NullPointerException("handlerType");
311         }
312         for (StaticChannelHandlerContext ctx: contexts) {
313             if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
314                 return ctx;
315             }
316         }
317         return null;
318     }
319 
320     public Map<String, ChannelHandler> toMap() {
321         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
322         for (StaticChannelHandlerContext ctx: contexts) {
323             map.put(ctx.getName(), ctx.getHandler());
324         }
325         return map;
326     }
327 
328     /**
329      * Returns the {@link String} representation of this pipeline.
330      */
331     @Override
332     public String toString() {
333         StringBuilder buf = new StringBuilder();
334         buf.append(getClass().getSimpleName());
335         buf.append('{');
336 
337         for (StaticChannelHandlerContext ctx: contexts) {
338             buf.append('(');
339             buf.append(ctx.getName());
340             buf.append(" = ");
341             buf.append(ctx.getHandler().getClass().getName());
342             buf.append(')');
343             buf.append(", ");
344         }
345         buf.replace(buf.length() - 2, buf.length(), "}");
346         return buf.toString();
347     }
348 
349     public void sendUpstream(ChannelEvent e) {
350         StaticChannelHandlerContext head = getActualUpstreamContext(0);
351         if (head == null) {
352             logger.warn(
353                     "The pipeline contains no upstream handlers; discarding: " + e);
354             return;
355         }
356 
357         sendUpstream(head, e);
358     }
359 
360     void sendUpstream(StaticChannelHandlerContext ctx, ChannelEvent e) {
361         try {
362             ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
363         } catch (Throwable t) {
364             notifyHandlerException(e, t);
365         }
366     }
367 
368     public void sendDownstream(ChannelEvent e) {
369         StaticChannelHandlerContext tail = getActualDownstreamContext(lastIndex);
370         if (tail == null) {
371             try {
372                 getSink().eventSunk(this, e);
373                 return;
374             } catch (Throwable t) {
375                 notifyHandlerException(e, t);
376                 return;
377             }
378         }
379 
380         sendDownstream(tail, e);
381     }
382 
383     void sendDownstream(StaticChannelHandlerContext ctx, ChannelEvent e) {
384         try {
385             ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
386         } catch (Throwable t) {
387             notifyHandlerException(e, t);
388         }
389     }
390 
391     StaticChannelHandlerContext getActualUpstreamContext(int index) {
392         for (int i = index; i < contexts.length; i ++) {
393             StaticChannelHandlerContext ctx = contexts[i];
394             if (ctx.canHandleUpstream()) {
395                 return ctx;
396             }
397         }
398         return null;
399     }
400 
401     StaticChannelHandlerContext getActualDownstreamContext(int index) {
402         for (int i = index; i >= 0; i --) {
403             StaticChannelHandlerContext ctx = contexts[i];
404             if (ctx.canHandleDownstream()) {
405                 return ctx;
406             }
407         }
408         return null;
409     }
410 
411     protected void notifyHandlerException(ChannelEvent e, Throwable t) {
412         if (e instanceof ExceptionEvent) {
413             logger.warn(
414                     "An exception was thrown by a user handler " +
415                     "while handling an exception event (" + e + ")", t);
416             return;
417         }
418 
419         ChannelPipelineException pe;
420         if (t instanceof ChannelPipelineException) {
421             pe = (ChannelPipelineException) t;
422         } else {
423             pe = new ChannelPipelineException(t);
424         }
425 
426         try {
427             sink.exceptionCaught(this, e, pe);
428         } catch (Exception e1) {
429             logger.warn("An exception was thrown by an exception handler.", e1);
430         }
431     }
432 
433     private final class StaticChannelHandlerContext implements ChannelHandlerContext {
434         private final int index;
435         private final String name;
436         private final ChannelHandler handler;
437         private final boolean canHandleUpstream;
438         private final boolean canHandleDownstream;
439         private volatile Object attachment;
440 
441         StaticChannelHandlerContext(
442                 int index, String name, ChannelHandler handler) {
443 
444             if (name == null) {
445                 throw new NullPointerException("name");
446             }
447             if (handler == null) {
448                 throw new NullPointerException("handler");
449             }
450             canHandleUpstream = handler instanceof ChannelUpstreamHandler;
451             canHandleDownstream = handler instanceof ChannelDownstreamHandler;
452 
453 
454             if (!canHandleUpstream && !canHandleDownstream) {
455                 throw new IllegalArgumentException(
456                         "handler must be either " +
457                         ChannelUpstreamHandler.class.getName() + " or " +
458                         ChannelDownstreamHandler.class.getName() + '.');
459             }
460 
461             this.index = index;
462             this.name = name;
463             this.handler = handler;
464         }
465 
466         public Channel getChannel() {
467             return getPipeline().getChannel();
468         }
469 
470         public ChannelPipeline getPipeline() {
471             return StaticChannelPipeline.this;
472         }
473 
474         public boolean canHandleDownstream() {
475             return canHandleDownstream;
476         }
477 
478         public boolean canHandleUpstream() {
479             return canHandleUpstream;
480         }
481 
482         public ChannelHandler getHandler() {
483             return handler;
484         }
485 
486         public String getName() {
487             return name;
488         }
489 
490         public Object getAttachment() {
491             return attachment;
492         }
493 
494         public void setAttachment(Object attachment) {
495             this.attachment = attachment;
496         }
497 
498         public void sendDownstream(ChannelEvent e) {
499             StaticChannelHandlerContext prev = getActualDownstreamContext(index - 1);
500             if (prev == null) {
501                 try {
502                     getSink().eventSunk(StaticChannelPipeline.this, e);
503                 } catch (Throwable t) {
504                     notifyHandlerException(e, t);
505                 }
506             } else {
507                 StaticChannelPipeline.this.sendDownstream(prev, e);
508             }
509         }
510 
511         public void sendUpstream(ChannelEvent e) {
512             StaticChannelHandlerContext next = getActualUpstreamContext(index + 1);
513             if (next != null) {
514                 StaticChannelPipeline.this.sendUpstream(next, e);
515             }
516         }
517     }
518 }