1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
19
20 import java.net.SocketAddress;
21 import java.nio.channels.ClosedChannelException;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelDownstreamHandler;
26 import org.jboss.netty.channel.ChannelEvent;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelFutureListener;
29 import org.jboss.netty.channel.ChannelHandlerContext;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.Channels;
32 import org.jboss.netty.channel.ExceptionEvent;
33 import org.jboss.netty.channel.MessageEvent;
34 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
35
36
37
38
39 public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40 implements ChannelDownstreamHandler {
41
42 private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43
44 private final SpdySession spdySession = new SpdySession();
45 private volatile int lastGoodStreamID;
46
47 private volatile int remoteConcurrentStreams;
48 private volatile int localConcurrentStreams;
49 private volatile int maxConcurrentStreams;
50
51 private static final int DEFAULT_WINDOW_SIZE = 64 * 1024;
52 private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
53 private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
54
55 private final Object flowControlLock = new Object();
56
57 private final AtomicInteger pings = new AtomicInteger();
58
59 private volatile boolean sentGoAwayFrame;
60 private volatile boolean receivedGoAwayFrame;
61
62 private volatile ChannelFuture closeSessionFuture;
63
64 private final boolean server;
65 private final boolean flowControl;
66
67
68
69
70
71
72
73
74
75 @Deprecated
76 public SpdySessionHandler(boolean server) {
77 this(2, server);
78 }
79
80
81
82
83
84
85
86
87
88
89 public SpdySessionHandler(int version, boolean server) {
90 super();
91 if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) {
92 throw new IllegalArgumentException(
93 "unsupported version: " + version);
94 }
95 this.server = server;
96 flowControl = version >= 3;
97 }
98
99 @Override
100 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
101 throws Exception {
102
103 Object msg = e.getMessage();
104 if (msg instanceof SpdyDataFrame) {
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
130 int streamID = spdyDataFrame.getStreamId();
131
132
133 if (!spdySession.isActiveStream(streamID)) {
134 if (streamID <= lastGoodStreamID) {
135 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
136 } else if (!sentGoAwayFrame) {
137 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
138 }
139 return;
140 }
141
142
143 if (spdySession.isRemoteSideClosed(streamID)) {
144 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
145 return;
146 }
147
148
149 if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) {
150 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
151 return;
152 }
153
154
155
156
157
158
159
160 if (flowControl) {
161
162 int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
163 int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
164
165
166
167
168
169
170 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamID)) {
171 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
172 return;
173 }
174
175
176
177 if (newWindowSize < 0) {
178 while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
179 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
180 partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
181 Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
182 }
183 }
184
185
186 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
187 deltaWindowSize = initialReceiveWindowSize - newWindowSize;
188 spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
189 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
190 new DefaultSpdyWindowUpdateFrame(streamID, deltaWindowSize);
191 Channels.write(
192 ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
193 }
194 }
195
196
197 if (spdyDataFrame.isLast()) {
198 halfCloseStream(streamID, true);
199 }
200
201 } else if (msg instanceof SpdySynStreamFrame) {
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
218 int streamID = spdySynStreamFrame.getStreamId();
219
220
221 if (spdySynStreamFrame.isInvalid() ||
222 !isRemoteInitiatedID(streamID) ||
223 spdySession.isActiveStream(streamID)) {
224 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
225 return;
226 }
227
228
229 if (streamID <= lastGoodStreamID) {
230 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
231 return;
232 }
233
234
235 byte priority = spdySynStreamFrame.getPriority();
236 boolean remoteSideClosed = spdySynStreamFrame.isLast();
237 boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
238 if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
239 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.REFUSED_STREAM);
240 return;
241 }
242
243 } else if (msg instanceof SpdySynReplyFrame) {
244
245
246
247
248
249
250
251
252 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
253 int streamID = spdySynReplyFrame.getStreamId();
254
255
256 if (spdySynReplyFrame.isInvalid() ||
257 isRemoteInitiatedID(streamID) ||
258 spdySession.isRemoteSideClosed(streamID)) {
259 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
260 return;
261 }
262
263
264 if (spdySession.hasReceivedReply(streamID)) {
265 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_IN_USE);
266 return;
267 }
268
269 spdySession.receivedReply(streamID);
270
271
272 if (spdySynReplyFrame.isLast()) {
273 halfCloseStream(streamID, true);
274 }
275
276 } else if (msg instanceof SpdyRstStreamFrame) {
277
278
279
280
281
282
283
284
285
286
287 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
288 removeStream(spdyRstStreamFrame.getStreamId());
289
290 } else if (msg instanceof SpdySettingsFrame) {
291
292 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
293
294 int newConcurrentStreams =
295 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
296 if (newConcurrentStreams >= 0) {
297 updateConcurrentStreams(newConcurrentStreams, true);
298 }
299
300
301
302
303 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
304 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
305 }
306 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
307
308 if (flowControl) {
309 int newInitialWindowSize =
310 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
311 if (newInitialWindowSize >= 0) {
312 updateInitialSendWindowSize(newInitialWindowSize);
313 }
314 }
315
316 } else if (msg instanceof SpdyPingFrame) {
317
318
319
320
321
322
323
324
325
326
327 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
328
329 if (isRemoteInitiatedID(spdyPingFrame.getId())) {
330 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
331 return;
332 }
333
334
335 if (pings.get() == 0) {
336 return;
337 }
338 pings.getAndDecrement();
339
340 } else if (msg instanceof SpdyGoAwayFrame) {
341
342 receivedGoAwayFrame = true;
343
344 } else if (msg instanceof SpdyHeadersFrame) {
345
346 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
347 int streamID = spdyHeadersFrame.getStreamId();
348
349
350 if (spdyHeadersFrame.isInvalid()) {
351 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
352 return;
353 }
354
355 if (spdySession.isRemoteSideClosed(streamID)) {
356 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
357 return;
358 }
359
360
361 if (spdyHeadersFrame.isLast()) {
362 halfCloseStream(streamID, true);
363 }
364
365 } else if (msg instanceof SpdyWindowUpdateFrame) {
366
367
368
369
370
371
372
373
374
375
376
377 if (flowControl) {
378 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
379 int streamID = spdyWindowUpdateFrame.getStreamId();
380 int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
381
382
383 if (spdySession.isLocalSideClosed(streamID)) {
384 return;
385 }
386
387
388 if (spdySession.getSendWindowSize(streamID) > Integer.MAX_VALUE - deltaWindowSize) {
389 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
390 return;
391 }
392
393 updateSendWindowSize(ctx, streamID, deltaWindowSize);
394 }
395 return;
396 }
397
398 super.messageReceived(ctx, e);
399 }
400
401 @Override
402 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
403 throws Exception {
404
405 Throwable cause = e.getCause();
406 if (cause instanceof SpdyProtocolException) {
407 issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
408 }
409
410 super.exceptionCaught(ctx, e);
411 }
412
413 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
414 throws Exception {
415 if (evt instanceof ChannelStateEvent) {
416 ChannelStateEvent e = (ChannelStateEvent) evt;
417 switch (e.getState()) {
418 case OPEN:
419 case CONNECTED:
420 case BOUND:
421
422
423
424
425
426
427
428 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
429 sendGoAwayFrame(ctx, e);
430 return;
431 }
432 }
433 }
434 if (!(evt instanceof MessageEvent)) {
435 ctx.sendDownstream(evt);
436 return;
437 }
438
439 MessageEvent e = (MessageEvent) evt;
440 Object msg = e.getMessage();
441
442 if (msg instanceof SpdyDataFrame) {
443
444 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
445 final int streamID = spdyDataFrame.getStreamId();
446
447
448 if (spdySession.isLocalSideClosed(streamID)) {
449 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
450 return;
451 }
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466 if (flowControl) {
467 synchronized (flowControlLock) {
468 int dataLength = spdyDataFrame.getData().readableBytes();
469 int sendWindowSize = spdySession.getSendWindowSize(streamID);
470
471 if (sendWindowSize >= dataLength) {
472
473 spdySession.updateSendWindowSize(streamID, -1 * dataLength);
474
475
476
477 final SocketAddress remoteAddress = e.getRemoteAddress();
478 final ChannelHandlerContext context = ctx;
479 e.getFuture().addListener(new ChannelFutureListener() {
480 public void operationComplete(ChannelFuture future) throws Exception {
481 if (!future.isSuccess()) {
482 issueStreamError(
483 context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
484 }
485 }
486 });
487
488 } else if (sendWindowSize > 0) {
489
490 spdySession.updateSendWindowSize(streamID, -1 * sendWindowSize);
491
492
493 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
494 partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
495
496
497 spdySession.putPendingWrite(streamID, e);
498
499 ChannelFuture writeFuture = Channels.future(e.getChannel());
500
501
502
503 final SocketAddress remoteAddress = e.getRemoteAddress();
504 final ChannelHandlerContext context = ctx;
505 e.getFuture().addListener(new ChannelFutureListener() {
506 public void operationComplete(ChannelFuture future) throws Exception {
507 if (!future.isSuccess()) {
508 issueStreamError(
509 context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
510 }
511 }
512 });
513
514 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
515 return;
516
517 } else {
518
519 spdySession.putPendingWrite(streamID, e);
520 return;
521 }
522 }
523 }
524
525
526 if (spdyDataFrame.isLast()) {
527 halfCloseStream(streamID, false);
528 }
529
530 } else if (msg instanceof SpdySynStreamFrame) {
531
532 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
533 int streamID = spdySynStreamFrame.getStreamId();
534
535 if (isRemoteInitiatedID(streamID)) {
536 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
537 return;
538 }
539
540 byte priority = spdySynStreamFrame.getPriority();
541 boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
542 boolean localSideClosed = spdySynStreamFrame.isLast();
543 if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
544 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
545 return;
546 }
547
548 } else if (msg instanceof SpdySynReplyFrame) {
549
550 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
551 int streamID = spdySynReplyFrame.getStreamId();
552
553
554 if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) {
555 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
556 return;
557 }
558
559
560 if (spdySynReplyFrame.isLast()) {
561 halfCloseStream(streamID, false);
562 }
563
564 } else if (msg instanceof SpdyRstStreamFrame) {
565
566 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
567 removeStream(spdyRstStreamFrame.getStreamId());
568
569 } else if (msg instanceof SpdySettingsFrame) {
570
571 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
572
573 int newConcurrentStreams =
574 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
575 if (newConcurrentStreams >= 0) {
576 updateConcurrentStreams(newConcurrentStreams, false);
577 }
578
579
580
581
582 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
583 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
584 }
585 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
586
587 if (flowControl) {
588 int newInitialWindowSize =
589 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
590 if (newInitialWindowSize >= 0) {
591 updateInitialReceiveWindowSize(newInitialWindowSize);
592 }
593 }
594
595 } else if (msg instanceof SpdyPingFrame) {
596
597 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
598 if (isRemoteInitiatedID(spdyPingFrame.getId())) {
599 e.getFuture().setFailure(new IllegalArgumentException(
600 "invalid PING ID: " + spdyPingFrame.getId()));
601 return;
602 }
603 pings.getAndIncrement();
604
605 } else if (msg instanceof SpdyGoAwayFrame) {
606
607
608
609 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
610 return;
611
612 } else if (msg instanceof SpdyHeadersFrame) {
613
614 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
615 int streamID = spdyHeadersFrame.getStreamId();
616
617
618 if (spdySession.isLocalSideClosed(streamID)) {
619 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
620 return;
621 }
622
623
624 if (spdyHeadersFrame.isLast()) {
625 halfCloseStream(streamID, false);
626 }
627
628 } else if (msg instanceof SpdyWindowUpdateFrame) {
629
630
631 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
632 return;
633 }
634
635 ctx.sendDownstream(evt);
636 }
637
638
639
640
641
642
643
644
645
646
647 private void issueSessionError(
648 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
649
650 ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
651 future.addListener(ChannelFutureListener.CLOSE);
652 }
653
654
655
656
657
658
659
660
661
662
663
664
665 private void issueStreamError(
666 ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamID, SpdyStreamStatus status) {
667
668 boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamID);
669 removeStream(streamID);
670
671 SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
672 Channels.write(ctx, Channels.future(ctx.getChannel()), spdyRstStreamFrame, remoteAddress);
673 if (fireMessageReceived) {
674 Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
675 }
676 }
677
678
679
680
681
682 private boolean isRemoteInitiatedID(int ID) {
683 boolean serverID = SpdyCodecUtil.isServerId(ID);
684 return server && !serverID || !server && serverID;
685 }
686
687 private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
688 if (remote) {
689 remoteConcurrentStreams = newConcurrentStreams;
690 } else {
691 localConcurrentStreams = newConcurrentStreams;
692 }
693 if (localConcurrentStreams == remoteConcurrentStreams) {
694 maxConcurrentStreams = localConcurrentStreams;
695 return;
696 }
697 if (localConcurrentStreams == 0) {
698 maxConcurrentStreams = remoteConcurrentStreams;
699 return;
700 }
701 if (remoteConcurrentStreams == 0) {
702 maxConcurrentStreams = localConcurrentStreams;
703 return;
704 }
705 if (localConcurrentStreams > remoteConcurrentStreams) {
706 maxConcurrentStreams = remoteConcurrentStreams;
707 } else {
708 maxConcurrentStreams = localConcurrentStreams;
709 }
710 }
711
712
713 private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
714 int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
715 initialSendWindowSize = newInitialWindowSize;
716 for (Integer StreamID: spdySession.getActiveStreams()) {
717 spdySession.updateSendWindowSize(StreamID.intValue(), deltaWindowSize);
718 }
719 }
720
721
722 private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
723 int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
724 initialReceiveWindowSize = newInitialWindowSize;
725 spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
726 }
727
728
729 private synchronized boolean acceptStream(
730 int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
731
732 if (receivedGoAwayFrame || sentGoAwayFrame) {
733 return false;
734 }
735
736 int maxConcurrentStreams = this.maxConcurrentStreams;
737 if (maxConcurrentStreams != 0 &&
738 spdySession.numActiveStreams() >= maxConcurrentStreams) {
739 return false;
740 }
741 spdySession.acceptStream(
742 streamID, priority, remoteSideClosed, localSideClosed,
743 initialSendWindowSize, initialReceiveWindowSize);
744 if (isRemoteInitiatedID(streamID)) {
745 lastGoodStreamID = streamID;
746 }
747 return true;
748 }
749
750 private void halfCloseStream(int streamID, boolean remote) {
751 if (remote) {
752 spdySession.closeRemoteSide(streamID);
753 } else {
754 spdySession.closeLocalSide(streamID);
755 }
756 if (closeSessionFuture != null && spdySession.noActiveStreams()) {
757 closeSessionFuture.setSuccess();
758 }
759 }
760
761 private void removeStream(int streamID) {
762 spdySession.removeStream(streamID);
763 if (closeSessionFuture != null && spdySession.noActiveStreams()) {
764 closeSessionFuture.setSuccess();
765 }
766 }
767
768 private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamID, int deltaWindowSize) {
769 synchronized (flowControlLock) {
770 int newWindowSize = spdySession.updateSendWindowSize(streamID, deltaWindowSize);
771
772 while (newWindowSize > 0) {
773
774 MessageEvent e = spdySession.getPendingWrite(streamID);
775 if (e == null) {
776 break;
777 }
778
779 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
780 int dataFrameSize = spdyDataFrame.getData().readableBytes();
781
782 if (newWindowSize >= dataFrameSize) {
783
784 spdySession.removePendingWrite(streamID);
785 newWindowSize = spdySession.updateSendWindowSize(streamID, -1 * dataFrameSize);
786
787
788
789 final SocketAddress remoteAddress = e.getRemoteAddress();
790 final ChannelHandlerContext context = ctx;
791 e.getFuture().addListener(new ChannelFutureListener() {
792 public void operationComplete(ChannelFuture future) throws Exception {
793 if (!future.isSuccess()) {
794 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
795 }
796 }
797 });
798
799
800 if (spdyDataFrame.isLast()) {
801 halfCloseStream(streamID, false);
802 }
803
804 Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
805
806 } else {
807
808 spdySession.updateSendWindowSize(streamID, -1 * newWindowSize);
809
810
811 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
812 partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
813
814 ChannelFuture writeFuture = Channels.future(e.getChannel());
815
816
817
818 final SocketAddress remoteAddress = e.getRemoteAddress();
819 final ChannelHandlerContext context = ctx;
820 e.getFuture().addListener(new ChannelFutureListener() {
821 public void operationComplete(ChannelFuture future) throws Exception {
822 if (!future.isSuccess()) {
823 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
824 }
825 }
826 });
827
828 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
829
830 newWindowSize = 0;
831 }
832 }
833 }
834 }
835
836 private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
837
838 if (!e.getChannel().isConnected()) {
839 ctx.sendDownstream(e);
840 return;
841 }
842
843 ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
844 if (spdySession.noActiveStreams()) {
845 future.addListener(new ClosingChannelFutureListener(ctx, e));
846 } else {
847 closeSessionFuture = Channels.future(e.getChannel());
848 closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
849 }
850 }
851
852 private synchronized ChannelFuture sendGoAwayFrame(
853 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
854 if (!sentGoAwayFrame) {
855 sentGoAwayFrame = true;
856 SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamID, status);
857 ChannelFuture future = Channels.future(channel);
858 Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
859 return future;
860 }
861 return Channels.succeededFuture(channel);
862 }
863
864 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
865 private final ChannelHandlerContext ctx;
866 private final ChannelStateEvent e;
867
868 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
869 this.ctx = ctx;
870 this.e = e;
871 }
872
873 public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
874 if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
875 Channels.close(ctx, e.getFuture());
876 } else {
877 e.getFuture().setSuccess();
878 }
879 }
880 }
881 }