1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import java.util.Comparator;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.TreeSet;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.jboss.netty.channel.MessageEvent;
27
28 final class SpdySession {
29
30 private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
31
32 private final Map<Integer, StreamState> activeStreams =
33 new ConcurrentHashMap<Integer, StreamState>();
34
35 SpdySession() {
36 }
37
38 int numActiveStreams() {
39 return activeStreams.size();
40 }
41
42 boolean noActiveStreams() {
43 return activeStreams.isEmpty();
44 }
45
46 boolean isActiveStream(int streamID) {
47 return activeStreams.containsKey(streamID);
48 }
49
50
51 Set<Integer> getActiveStreams() {
52 TreeSet<Integer> StreamIDs = new TreeSet<Integer>(new PriorityComparator());
53 StreamIDs.addAll(activeStreams.keySet());
54 return StreamIDs;
55 }
56
57 void acceptStream(
58 int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed,
59 int sendWindowSize, int receiveWindowSize) {
60 if (!remoteSideClosed || !localSideClosed) {
61 activeStreams.put(
62 streamID,
63 new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
64 }
65 }
66
67 void removeStream(int streamID) {
68 Integer StreamID = streamID;
69 StreamState state = activeStreams.get(StreamID);
70 activeStreams.remove(StreamID);
71 if (state != null) {
72 MessageEvent e = state.removePendingWrite();
73 while (e != null) {
74 e.getFuture().setFailure(STREAM_CLOSED);
75 e = state.removePendingWrite();
76 }
77 }
78 }
79
80 boolean isRemoteSideClosed(int streamID) {
81 StreamState state = activeStreams.get(streamID);
82 return state == null || state.isRemoteSideClosed();
83 }
84
85 void closeRemoteSide(int streamID) {
86 Integer StreamID = streamID;
87 StreamState state = activeStreams.get(StreamID);
88 if (state != null) {
89 state.closeRemoteSide();
90 if (state.isLocalSideClosed()) {
91 activeStreams.remove(StreamID);
92 }
93 }
94 }
95
96 boolean isLocalSideClosed(int streamID) {
97 StreamState state = activeStreams.get(streamID);
98 return state == null || state.isLocalSideClosed();
99 }
100
101 void closeLocalSide(int streamID) {
102 Integer StreamID = streamID;
103 StreamState state = activeStreams.get(StreamID);
104 if (state != null) {
105 state.closeLocalSide();
106 if (state.isRemoteSideClosed()) {
107 activeStreams.remove(StreamID);
108 }
109 }
110 }
111
112
113
114
115
116
117 boolean hasReceivedReply(int streamID) {
118 StreamState state = activeStreams.get(streamID);
119 return state != null && state.hasReceivedReply();
120 }
121
122 void receivedReply(int streamID) {
123 StreamState state = activeStreams.get(streamID);
124 if (state != null) {
125 state.receivedReply();
126 }
127 }
128
129 int getSendWindowSize(int streamID) {
130 StreamState state = activeStreams.get(streamID);
131 return state != null ? state.getSendWindowSize() : -1;
132 }
133
134 int updateSendWindowSize(int streamID, int deltaWindowSize) {
135 StreamState state = activeStreams.get(streamID);
136 return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
137 }
138
139 int updateReceiveWindowSize(int streamID, int deltaWindowSize) {
140 StreamState state = activeStreams.get(streamID);
141 if (deltaWindowSize > 0) {
142 state.setReceiveWindowSizeLowerBound(0);
143 }
144 return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
145 }
146
147 int getReceiveWindowSizeLowerBound(int streamID) {
148 StreamState state = activeStreams.get(streamID);
149 return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
150 }
151
152 void updateAllReceiveWindowSizes(int deltaWindowSize) {
153 for (StreamState state: activeStreams.values()) {
154 state.updateReceiveWindowSize(deltaWindowSize);
155 if (deltaWindowSize < 0) {
156 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
157 }
158 }
159 }
160
161 boolean putPendingWrite(int streamID, MessageEvent evt) {
162 StreamState state = activeStreams.get(streamID);
163 return state != null && state.putPendingWrite(evt);
164 }
165
166 MessageEvent getPendingWrite(int streamID) {
167 StreamState state = activeStreams.get(streamID);
168 return state != null ? state.getPendingWrite() : null;
169 }
170
171 MessageEvent removePendingWrite(int streamID) {
172 StreamState state = activeStreams.get(streamID);
173 return state != null ? state.removePendingWrite() : null;
174 }
175
176 private static final class StreamState {
177
178 private final byte priority;
179 private volatile boolean remoteSideClosed;
180 private volatile boolean localSideClosed;
181 private boolean receivedReply;
182 private final AtomicInteger sendWindowSize;
183 private final AtomicInteger receiveWindowSize;
184 private volatile int receiveWindowSizeLowerBound;
185 private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
186 new ConcurrentLinkedQueue<MessageEvent>();
187
188 StreamState(
189 byte priority, boolean remoteSideClosed, boolean localSideClosed,
190 int sendWindowSize, int receiveWindowSize) {
191 this.priority = priority;
192 this.remoteSideClosed = remoteSideClosed;
193 this.localSideClosed = localSideClosed;
194 this.sendWindowSize = new AtomicInteger(sendWindowSize);
195 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
196 }
197
198 byte getPriority() {
199 return priority;
200 }
201
202 boolean isRemoteSideClosed() {
203 return remoteSideClosed;
204 }
205
206 void closeRemoteSide() {
207 remoteSideClosed = true;
208 }
209
210 boolean isLocalSideClosed() {
211 return localSideClosed;
212 }
213
214 void closeLocalSide() {
215 localSideClosed = true;
216 }
217
218 boolean hasReceivedReply() {
219 return receivedReply;
220 }
221
222 void receivedReply() {
223 receivedReply = true;
224 }
225
226 int getSendWindowSize() {
227 return sendWindowSize.get();
228 }
229
230 int updateSendWindowSize(int deltaWindowSize) {
231 return sendWindowSize.addAndGet(deltaWindowSize);
232 }
233
234 int updateReceiveWindowSize(int deltaWindowSize) {
235 return receiveWindowSize.addAndGet(deltaWindowSize);
236 }
237
238 int getReceiveWindowSizeLowerBound() {
239 return receiveWindowSizeLowerBound;
240 }
241
242 void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
243 this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
244 }
245
246 boolean putPendingWrite(MessageEvent evt) {
247 return pendingWriteQueue.offer(evt);
248 }
249
250 MessageEvent getPendingWrite() {
251 return pendingWriteQueue.peek();
252 }
253
254 MessageEvent removePendingWrite() {
255 return pendingWriteQueue.poll();
256 }
257 }
258
259 private final class PriorityComparator implements Comparator<Integer> {
260
261 PriorityComparator() {
262 super();
263 }
264
265 public int compare(Integer id1, Integer id2) {
266 StreamState state1 = activeStreams.get(id1);
267 StreamState state2 = activeStreams.get(id2);
268 return state1.getPriority() - state2.getPriority();
269 }
270 }
271 }