View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
19  
20  import java.net.SocketAddress;
21  import java.nio.channels.ClosedChannelException;
22  import java.util.concurrent.atomic.AtomicInteger;
23  
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelDownstreamHandler;
26  import org.jboss.netty.channel.ChannelEvent;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelFutureListener;
29  import org.jboss.netty.channel.ChannelHandlerContext;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.Channels;
32  import org.jboss.netty.channel.ExceptionEvent;
33  import org.jboss.netty.channel.MessageEvent;
34  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
35  
36  /**
37   * Manages streams within a SPDY session.
38   */
39  public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40          implements ChannelDownstreamHandler {
41  
42      private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43  
44      private final SpdySession spdySession = new SpdySession();
45      private volatile int lastGoodStreamID;
46  
47      private volatile int remoteConcurrentStreams;
48      private volatile int localConcurrentStreams;
49      private volatile int maxConcurrentStreams;
50  
51      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
52      private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
53      private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
54  
55      private final Object flowControlLock = new Object();
56  
57      private final AtomicInteger pings = new AtomicInteger();
58  
59      private volatile boolean sentGoAwayFrame;
60      private volatile boolean receivedGoAwayFrame;
61  
62      private volatile ChannelFuture closeSessionFuture;
63  
64      private final boolean server;
65      private final boolean flowControl;
66  
67      /**
68       * Creates a new SPDY/2 session handler.
69       *
70       * @param server {@code true} if and only if this session handler should
71       *               handle the server endpoint of the connection.
72       *               {@code false} if and only if this session handler should
73       *               handle the client endpoint of the connection.
74       */
75      @Deprecated
76      public SpdySessionHandler(boolean server) {
77          this(2, server);
78      }
79  
80      /**
81       * Creates a new session handler.
82       *
83       * @param version the protocol version
84       * @param server  {@code true} if and only if this session handler should
85       *                handle the server endpoint of the connection.
86       *                {@code false} if and only if this session handler should
87       *                handle the client endpoint of the connection.
88       */
89      public SpdySessionHandler(int version, boolean server) {
90          super();
91          if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) {
92              throw new IllegalArgumentException(
93                      "unsupported version: " + version);
94          }
95          this.server = server;
96          flowControl = version >= 3;
97      }
98  
99      @Override
100     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
101             throws Exception {
102 
103         Object msg = e.getMessage();
104         if (msg instanceof SpdyDataFrame) {
105 
106             /*
107              * SPDY Data frame processing requirements:
108              *
109              * If an endpoint receives a data frame for a Stream-ID which is not open
110              * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
111              * with the error code INVALID_STREAM for the Stream-ID.
112              *
113              * If an endpoint which created the stream receives a data frame before receiving
114              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
115              * issue a stream error with the status code PROTOCOL_ERROR for the Stream-ID.
116              *
117              * If an endpoint receives multiple data frames for invalid Stream-IDs,
118              * it may close the session.
119              *
120              * If an endpoint refuses a stream it must ignore any data frames for that stream.
121              *
122              * If an endpoint receives a data frame after the stream is half-closed from the
123              * sender, it must send a RST_STREAM frame with the status STREAM_ALREADY_CLOSED.
124              *
125              * If an endpoint receives a data frame after the stream is closed, it must send
126              * a RST_STREAM frame with the status PROTOCOL_ERROR.
127              */
128 
129             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
130             int streamID = spdyDataFrame.getStreamId();
131 
132             // Check if we received a data frame for a Stream-ID which is not open
133             if (!spdySession.isActiveStream(streamID)) {
134                 if (streamID <= lastGoodStreamID) {
135                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
136                 } else if (!sentGoAwayFrame) {
137                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
138                 }
139                 return;
140             }
141 
142             // Check if we received a data frame for a stream which is half-closed
143             if (spdySession.isRemoteSideClosed(streamID)) {
144                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
145                 return;
146             }
147 
148             // Check if we received a data frame before receiving a SYN_REPLY
149             if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) {
150                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
151                 return;
152             }
153 
154             /*
155             * SPDY Data frame flow control processing requirements:
156             *
157             * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
158             */
159 
160             if (flowControl) {
161                 // Update receive window size
162                 int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
163                 int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
164 
165                 // Window size can become negative if we sent a SETTINGS frame that reduces the
166                 // size of the transfer window after the peer has written data frames.
167                 // The value is bounded by the length that SETTINGS frame decrease the window.
168                 // This difference is stored for the session when writing the SETTINGS frame
169                 // and is cleared once we send a WINDOW_UPDATE frame.
170                 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamID)) {
171                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
172                     return;
173                 }
174 
175                 // Window size became negative due to sender writing frame before receiving SETTINGS
176                 // Send data frames upstream in initialReceiveWindowSize chunks
177                 if (newWindowSize < 0) {
178                     while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
179                         SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
180                         partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
181                         Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
182                     }
183                 }
184 
185                 // Send a WINDOW_UPDATE frame if less than half the window size remains
186                 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
187                     deltaWindowSize = initialReceiveWindowSize - newWindowSize;
188                     spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
189                     SpdyWindowUpdateFrame spdyWindowUpdateFrame =
190                             new DefaultSpdyWindowUpdateFrame(streamID, deltaWindowSize);
191                     Channels.write(
192                             ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
193                 }
194             }
195 
196             // Close the remote side of the stream if this is the last frame
197             if (spdyDataFrame.isLast()) {
198                 halfCloseStream(streamID, true);
199             }
200 
201         } else if (msg instanceof SpdySynStreamFrame) {
202 
203             /*
204              * SPDY SYN_STREAM frame processing requirements:
205              *
206              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
207              * any previously received SYN_STREAM, it must issue a session error with
208              * the status PROTOCOL_ERROR.
209              *
210              * If an endpoint receives multiple SYN_STREAM frames with the same active
211              * Stream-ID, it must issue a stream error with the status code PROTOCOL_ERROR.
212              *
213              * The recipient can reject a stream by sending a stream error with the
214              * status code REFUSED_STREAM.
215              */
216 
217             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
218             int streamID = spdySynStreamFrame.getStreamId();
219 
220             // Check if we received a valid SYN_STREAM frame
221             if (spdySynStreamFrame.isInvalid() ||
222                 !isRemoteInitiatedID(streamID) ||
223                 spdySession.isActiveStream(streamID)) {
224                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
225                 return;
226             }
227 
228             // Stream-IDs must be monotonically increasing
229             if (streamID <= lastGoodStreamID) {
230                 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
231                 return;
232             }
233 
234             // Try to accept the stream
235             byte priority = spdySynStreamFrame.getPriority();
236             boolean remoteSideClosed = spdySynStreamFrame.isLast();
237             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
238             if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
239                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.REFUSED_STREAM);
240                 return;
241             }
242 
243         } else if (msg instanceof SpdySynReplyFrame) {
244 
245             /*
246              * SPDY SYN_REPLY frame processing requirements:
247              *
248              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
249              * it must issue a stream error with the status code STREAM_IN_USE.
250              */
251 
252             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
253             int streamID = spdySynReplyFrame.getStreamId();
254 
255             // Check if we received a valid SYN_REPLY frame
256             if (spdySynReplyFrame.isInvalid() ||
257                 isRemoteInitiatedID(streamID) ||
258                 spdySession.isRemoteSideClosed(streamID)) {
259                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
260                 return;
261             }
262 
263             // Check if we have received multiple frames for the same Stream-ID
264             if (spdySession.hasReceivedReply(streamID)) {
265                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_IN_USE);
266                 return;
267             }
268 
269             spdySession.receivedReply(streamID);
270 
271             // Close the remote side of the stream if this is the last frame
272             if (spdySynReplyFrame.isLast()) {
273                 halfCloseStream(streamID, true);
274             }
275 
276         } else if (msg instanceof SpdyRstStreamFrame) {
277 
278             /*
279              * SPDY RST_STREAM frame processing requirements:
280              *
281              * After receiving a RST_STREAM on a stream, the receiver must not send
282              * additional frames on that stream.
283              *
284              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
285              */
286 
287             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
288             removeStream(spdyRstStreamFrame.getStreamId());
289 
290         } else if (msg instanceof SpdySettingsFrame) {
291 
292             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
293 
294             int newConcurrentStreams =
295                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
296             if (newConcurrentStreams >= 0) {
297                 updateConcurrentStreams(newConcurrentStreams, true);
298             }
299 
300             // Persistence flag are inconsistent with the use of SETTINGS to communicate
301             // the initial window size. Remove flags from the sender requesting that the
302             // value be persisted. Remove values that the sender indicates are persisted.
303             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
304                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
305             }
306             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
307 
308             if (flowControl) {
309                 int newInitialWindowSize =
310                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
311                 if (newInitialWindowSize >= 0) {
312                     updateInitialSendWindowSize(newInitialWindowSize);
313                 }
314             }
315 
316         } else if (msg instanceof SpdyPingFrame) {
317 
318             /*
319              * SPDY PING frame processing requirements:
320              *
321              * Receivers of a PING frame should send an identical frame to the sender
322              * as soon as possible.
323              *
324              * Receivers of a PING frame must ignore frames that it did not initiate
325              */
326 
327             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
328 
329             if (isRemoteInitiatedID(spdyPingFrame.getId())) {
330                 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
331                 return;
332             }
333 
334             // Note: only checks that there are outstanding pings since uniqueness is not enforced
335             if (pings.get() == 0) {
336                 return;
337             }
338             pings.getAndDecrement();
339 
340         } else if (msg instanceof SpdyGoAwayFrame) {
341 
342             receivedGoAwayFrame = true;
343 
344         } else if (msg instanceof SpdyHeadersFrame) {
345 
346             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
347             int streamID = spdyHeadersFrame.getStreamId();
348 
349             // Check if we received a valid HEADERS frame
350             if (spdyHeadersFrame.isInvalid()) {
351                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
352                 return;
353             }
354 
355             if (spdySession.isRemoteSideClosed(streamID)) {
356                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
357                 return;
358             }
359 
360             // Close the remote side of the stream if this is the last frame
361             if (spdyHeadersFrame.isLast()) {
362                 halfCloseStream(streamID, true);
363             }
364 
365         } else if (msg instanceof SpdyWindowUpdateFrame) {
366 
367             /*
368              * SPDY WINDOW_UPDATE frame processing requirements:
369              *
370              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
371              * must send a RST_STREAM with the status code FLOW_CONTROL_ERROR.
372              *
373              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
374              * after sending the last frame for the stream.
375              */
376 
377             if (flowControl) {
378                 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
379                 int streamID = spdyWindowUpdateFrame.getStreamId();
380                 int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
381 
382                 // Ignore frames for half-closed streams
383                 if (spdySession.isLocalSideClosed(streamID)) {
384                     return;
385                 }
386 
387                 // Check for numerical overflow
388                 if (spdySession.getSendWindowSize(streamID) > Integer.MAX_VALUE - deltaWindowSize) {
389                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
390                     return;
391                 }
392 
393                 updateSendWindowSize(ctx, streamID, deltaWindowSize);
394             }
395             return;
396         }
397 
398         super.messageReceived(ctx, e);
399     }
400 
401     @Override
402     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
403             throws Exception {
404 
405         Throwable cause = e.getCause();
406         if (cause instanceof SpdyProtocolException) {
407             issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
408         }
409 
410         super.exceptionCaught(ctx, e);
411     }
412 
413     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
414             throws Exception {
415         if (evt instanceof ChannelStateEvent) {
416             ChannelStateEvent e = (ChannelStateEvent) evt;
417             switch (e.getState()) {
418             case OPEN:
419             case CONNECTED:
420             case BOUND:
421 
422                 /*
423                  * SPDY connection requirements:
424                  *
425                  * When either endpoint closes the transport-level connection,
426                  * it must first send a GOAWAY frame.
427                  */
428                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
429                     sendGoAwayFrame(ctx, e);
430                     return;
431                 }
432             }
433         }
434         if (!(evt instanceof MessageEvent)) {
435             ctx.sendDownstream(evt);
436             return;
437         }
438 
439         MessageEvent e = (MessageEvent) evt;
440         Object msg = e.getMessage();
441 
442         if (msg instanceof SpdyDataFrame) {
443 
444             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
445             final int streamID = spdyDataFrame.getStreamId();
446 
447             // Frames must not be sent on half-closed streams
448             if (spdySession.isLocalSideClosed(streamID)) {
449                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
450                 return;
451             }
452 
453             /*
454              * SPDY Data frame flow control processing requirements:
455              *
456              * Sender must not send a data frame with data length greater
457              * than the transfer window size.
458              *
459              * After sending each data frame, the sender decrements its
460              * transfer window size by the amount of data transmitted.
461              *
462              * When the window size becomes less than or equal to 0, the
463              * sender must pause transmitting data frames.
464              */
465 
466             if (flowControl) {
467                 synchronized (flowControlLock) {
468                     int dataLength = spdyDataFrame.getData().readableBytes();
469                     int sendWindowSize = spdySession.getSendWindowSize(streamID);
470 
471                     if (sendWindowSize >= dataLength) {
472                         // Window size is large enough to send entire data frame
473                         spdySession.updateSendWindowSize(streamID, -1 * dataLength);
474 
475                         // The transfer window size is pre-decremented when sending a data frame downstream.
476                         // Close the stream on write failures that leaves the transfer window in a corrupt state.
477                         final SocketAddress remoteAddress = e.getRemoteAddress();
478                         final ChannelHandlerContext context = ctx;
479                         e.getFuture().addListener(new ChannelFutureListener() {
480                             public void operationComplete(ChannelFuture future) throws Exception {
481                                 if (!future.isSuccess()) {
482                                     issueStreamError(
483                                             context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
484                                 }
485                             }
486                         });
487 
488                     } else if (sendWindowSize > 0) {
489                         // Stream is not stalled but we cannot send the entire frame
490                         spdySession.updateSendWindowSize(streamID, -1 * sendWindowSize);
491 
492                         // Create a partial data frame whose length is the current window size
493                         SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
494                         partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
495 
496                         // Enqueue the remaining data (will be the first frame queued)
497                         spdySession.putPendingWrite(streamID, e);
498 
499                         ChannelFuture writeFuture = Channels.future(e.getChannel());
500 
501                         // The transfer window size is pre-decremented when sending a data frame downstream.
502                         // Close the stream on write failures that leaves the transfer window in a corrupt state.
503                         final SocketAddress remoteAddress = e.getRemoteAddress();
504                         final ChannelHandlerContext context = ctx;
505                         e.getFuture().addListener(new ChannelFutureListener() {
506                             public void operationComplete(ChannelFuture future) throws Exception {
507                                 if (!future.isSuccess()) {
508                                     issueStreamError(
509                                             context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
510                                 }
511                             }
512                         });
513 
514                         Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
515                         return;
516 
517                     } else {
518                         // Stream is stalled -- enqueue Data frame and return
519                         spdySession.putPendingWrite(streamID, e);
520                         return;
521                     }
522                 }
523             }
524 
525             // Close the local side of the stream if this is the last frame
526             if (spdyDataFrame.isLast()) {
527                 halfCloseStream(streamID, false);
528             }
529 
530         } else if (msg instanceof SpdySynStreamFrame) {
531 
532             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
533             int streamID = spdySynStreamFrame.getStreamId();
534 
535             if (isRemoteInitiatedID(streamID)) {
536                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
537                 return;
538             }
539 
540             byte priority = spdySynStreamFrame.getPriority();
541             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
542             boolean localSideClosed = spdySynStreamFrame.isLast();
543             if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
544                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
545                 return;
546             }
547 
548         } else if (msg instanceof SpdySynReplyFrame) {
549 
550             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
551             int streamID = spdySynReplyFrame.getStreamId();
552 
553             // Frames must not be sent on half-closed streams
554             if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) {
555                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
556                 return;
557             }
558 
559             // Close the local side of the stream if this is the last frame
560             if (spdySynReplyFrame.isLast()) {
561                 halfCloseStream(streamID, false);
562             }
563 
564         } else if (msg instanceof SpdyRstStreamFrame) {
565 
566             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
567             removeStream(spdyRstStreamFrame.getStreamId());
568 
569         } else if (msg instanceof SpdySettingsFrame) {
570 
571             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
572 
573             int newConcurrentStreams =
574                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
575             if (newConcurrentStreams >= 0) {
576                 updateConcurrentStreams(newConcurrentStreams, false);
577             }
578 
579             // Persistence flag are inconsistent with the use of SETTINGS to communicate
580             // the initial window size. Remove flags from the sender requesting that the
581             // value be persisted. Remove values that the sender indicates are persisted.
582             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
583                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
584             }
585             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
586 
587             if (flowControl) {
588                 int newInitialWindowSize =
589                         spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
590                 if (newInitialWindowSize >= 0) {
591                     updateInitialReceiveWindowSize(newInitialWindowSize);
592                 }
593             }
594 
595         } else if (msg instanceof SpdyPingFrame) {
596 
597             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
598             if (isRemoteInitiatedID(spdyPingFrame.getId())) {
599                 e.getFuture().setFailure(new IllegalArgumentException(
600                             "invalid PING ID: " + spdyPingFrame.getId()));
601                 return;
602             }
603             pings.getAndIncrement();
604 
605         } else if (msg instanceof SpdyGoAwayFrame) {
606 
607             // Why is this being sent? Intercept it and fail the write.
608             // Should have sent a CLOSE ChannelStateEvent
609             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
610             return;
611 
612         } else if (msg instanceof SpdyHeadersFrame) {
613 
614             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
615             int streamID = spdyHeadersFrame.getStreamId();
616 
617             // Frames must not be sent on half-closed streams
618             if (spdySession.isLocalSideClosed(streamID)) {
619                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
620                 return;
621             }
622 
623             // Close the local side of the stream if this is the last frame
624             if (spdyHeadersFrame.isLast()) {
625                 halfCloseStream(streamID, false);
626             }
627 
628         } else if (msg instanceof SpdyWindowUpdateFrame) {
629 
630             // Why is this being sent? Intercept it and fail the write.
631             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
632             return;
633         }
634 
635         ctx.sendDownstream(evt);
636     }
637 
638     /*
639      * SPDY Session Error Handling:
640      *
641      * When a session error occurs, the endpoint encountering the error must first
642      * send a GOAWAY frame with the Stream-ID of the most recently received stream
643      * from the remote endpoint, and the error code for why the session is terminating.
644      *
645      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
646      */
647     private void issueSessionError(
648             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
649 
650         ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
651         future.addListener(ChannelFutureListener.CLOSE);
652     }
653 
654     /*
655      * SPDY Stream Error Handling:
656      *
657      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
658      * the Stream-ID for the stream where the error occurred and the error status which
659      * caused the error.
660      *
661      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
662      *
663      * Note: this is only called by the worker thread
664      */
665     private void issueStreamError(
666             ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamID, SpdyStreamStatus status) {
667 
668         boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamID);
669         removeStream(streamID);
670 
671         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
672         Channels.write(ctx, Channels.future(ctx.getChannel()), spdyRstStreamFrame, remoteAddress);
673         if (fireMessageReceived) {
674             Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
675         }
676     }
677 
678     /*
679      * Helper functions
680      */
681 
682     private boolean isRemoteInitiatedID(int ID) {
683         boolean serverID = SpdyCodecUtil.isServerId(ID);
684         return server && !serverID || !server && serverID;
685     }
686 
687     private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
688         if (remote) {
689             remoteConcurrentStreams = newConcurrentStreams;
690         } else {
691             localConcurrentStreams = newConcurrentStreams;
692         }
693         if (localConcurrentStreams == remoteConcurrentStreams) {
694             maxConcurrentStreams = localConcurrentStreams;
695             return;
696         }
697         if (localConcurrentStreams == 0) {
698             maxConcurrentStreams = remoteConcurrentStreams;
699             return;
700         }
701         if (remoteConcurrentStreams == 0) {
702             maxConcurrentStreams = localConcurrentStreams;
703             return;
704         }
705         if (localConcurrentStreams > remoteConcurrentStreams) {
706             maxConcurrentStreams = remoteConcurrentStreams;
707         } else {
708             maxConcurrentStreams = localConcurrentStreams;
709         }
710     }
711 
712     // need to synchronize to prevent new streams from being created while updating active streams
713     private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
714         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
715         initialSendWindowSize = newInitialWindowSize;
716         for (Integer StreamID: spdySession.getActiveStreams()) {
717             spdySession.updateSendWindowSize(StreamID.intValue(), deltaWindowSize);
718         }
719     }
720 
721     // need to synchronize to prevent new streams from being created while updating active streams
722     private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
723         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
724         initialReceiveWindowSize = newInitialWindowSize;
725         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
726     }
727 
728     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamID, and initial window sizes
729     private synchronized boolean acceptStream(
730             int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
731         // Cannot initiate any new streams after receiving or sending GOAWAY
732         if (receivedGoAwayFrame || sentGoAwayFrame) {
733             return false;
734         }
735 
736         int maxConcurrentStreams = this.maxConcurrentStreams; // read volatile once
737         if (maxConcurrentStreams != 0 &&
738            spdySession.numActiveStreams() >= maxConcurrentStreams) {
739             return false;
740         }
741         spdySession.acceptStream(
742                 streamID, priority, remoteSideClosed, localSideClosed,
743                 initialSendWindowSize, initialReceiveWindowSize);
744         if (isRemoteInitiatedID(streamID)) {
745             lastGoodStreamID = streamID;
746         }
747         return true;
748     }
749 
750     private void halfCloseStream(int streamID, boolean remote) {
751         if (remote) {
752             spdySession.closeRemoteSide(streamID);
753         } else {
754             spdySession.closeLocalSide(streamID);
755         }
756         if (closeSessionFuture != null && spdySession.noActiveStreams()) {
757             closeSessionFuture.setSuccess();
758         }
759     }
760 
761     private void removeStream(int streamID) {
762         spdySession.removeStream(streamID);
763         if (closeSessionFuture != null && spdySession.noActiveStreams()) {
764             closeSessionFuture.setSuccess();
765         }
766     }
767 
768     private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamID, int deltaWindowSize) {
769         synchronized (flowControlLock) {
770             int newWindowSize = spdySession.updateSendWindowSize(streamID, deltaWindowSize);
771 
772             while (newWindowSize > 0) {
773                 // Check if we have unblocked a stalled stream
774                 MessageEvent e = spdySession.getPendingWrite(streamID);
775                 if (e == null) {
776                     break;
777                 }
778 
779                 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
780                 int dataFrameSize = spdyDataFrame.getData().readableBytes();
781 
782                 if (newWindowSize >= dataFrameSize) {
783                     // Window size is large enough to send entire data frame
784                     spdySession.removePendingWrite(streamID);
785                     newWindowSize = spdySession.updateSendWindowSize(streamID, -1 * dataFrameSize);
786 
787                     // The transfer window size is pre-decremented when sending a data frame downstream.
788                     // Close the stream on write failures that leaves the transfer window in a corrupt state.
789                     final SocketAddress remoteAddress = e.getRemoteAddress();
790                     final ChannelHandlerContext context = ctx;
791                     e.getFuture().addListener(new ChannelFutureListener() {
792                         public void operationComplete(ChannelFuture future) throws Exception {
793                             if (!future.isSuccess()) {
794                                 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
795                             }
796                         }
797                     });
798 
799                     // Close the local side of the stream if this is the last frame
800                     if (spdyDataFrame.isLast()) {
801                         halfCloseStream(streamID, false);
802                     }
803 
804                     Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
805 
806                 } else {
807                     // We can send a partial frame
808                     spdySession.updateSendWindowSize(streamID, -1 * newWindowSize);
809 
810                     // Create a partial data frame whose length is the current window size
811                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
812                     partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
813 
814                     ChannelFuture writeFuture = Channels.future(e.getChannel());
815 
816                     // The transfer window size is pre-decremented when sending a data frame downstream.
817                     // Close the stream on write failures that leaves the transfer window in a corrupt state.
818                     final SocketAddress remoteAddress = e.getRemoteAddress();
819                     final ChannelHandlerContext context = ctx;
820                     e.getFuture().addListener(new ChannelFutureListener() {
821                         public void operationComplete(ChannelFuture future) throws Exception {
822                             if (!future.isSuccess()) {
823                                 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
824                             }
825                         }
826                     });
827 
828                     Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
829 
830                     newWindowSize = 0;
831                 }
832             }
833         }
834     }
835 
836     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
837         // Avoid NotYetConnectedException
838         if (!e.getChannel().isConnected()) {
839             ctx.sendDownstream(e);
840             return;
841         }
842 
843         ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
844         if (spdySession.noActiveStreams()) {
845             future.addListener(new ClosingChannelFutureListener(ctx, e));
846         } else {
847             closeSessionFuture = Channels.future(e.getChannel());
848             closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
849         }
850     }
851 
852     private synchronized ChannelFuture sendGoAwayFrame(
853             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
854         if (!sentGoAwayFrame) {
855             sentGoAwayFrame = true;
856             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamID, status);
857             ChannelFuture future = Channels.future(channel);
858             Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
859             return future;
860         }
861         return Channels.succeededFuture(channel);
862     }
863 
864     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
865         private final ChannelHandlerContext ctx;
866         private final ChannelStateEvent e;
867 
868         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
869             this.ctx = ctx;
870             this.e = e;
871         }
872 
873         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
874             if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
875                 Channels.close(ctx, e.getFuture());
876             } else {
877                 e.getFuture().setSuccess();
878             }
879         }
880     }
881 }