1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.group;
17
18 import static java.util.concurrent.TimeUnit.*;
19
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.LinkedHashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.ChannelFutureListener;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34 import org.jboss.netty.util.internal.IoWorkerRunnable;
35
36
37
38
39
40
41
42
43
44 public class DefaultChannelGroupFuture implements ChannelGroupFuture {
45
46 private static final InternalLogger logger =
47 InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
48
49 private final ChannelGroup group;
50 final Map<Integer, ChannelFuture> futures;
51 private ChannelGroupFutureListener firstListener;
52 private List<ChannelGroupFutureListener> otherListeners;
53 private boolean done;
54 int successCount;
55 int failureCount;
56 private int waiters;
57
58 private final ChannelFutureListener childListener = new ChannelFutureListener() {
59 public void operationComplete(ChannelFuture future) throws Exception {
60 boolean success = future.isSuccess();
61 boolean callSetDone = false;
62 synchronized (DefaultChannelGroupFuture.this) {
63 if (success) {
64 successCount ++;
65 } else {
66 failureCount ++;
67 }
68
69 callSetDone = successCount + failureCount == futures.size();
70 assert successCount + failureCount <= futures.size();
71 }
72
73 if (callSetDone) {
74 setDone();
75 }
76 }
77 };
78
79
80
81
82 public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
83 if (group == null) {
84 throw new NullPointerException("group");
85 }
86 if (futures == null) {
87 throw new NullPointerException("futures");
88 }
89
90 this.group = group;
91
92 Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
93 for (ChannelFuture f: futures) {
94 futureMap.put(f.getChannel().getId(), f);
95 }
96
97 this.futures = Collections.unmodifiableMap(futureMap);
98
99 for (ChannelFuture f: this.futures.values()) {
100 f.addListener(childListener);
101 }
102
103
104 if (this.futures.isEmpty()) {
105 setDone();
106 }
107 }
108
109 DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
110 this.group = group;
111 this.futures = Collections.unmodifiableMap(futures);
112 for (ChannelFuture f: this.futures.values()) {
113 f.addListener(childListener);
114 }
115
116
117 if (this.futures.isEmpty()) {
118 setDone();
119 }
120 }
121
122 public ChannelGroup getGroup() {
123 return group;
124 }
125
126 public ChannelFuture find(Integer channelId) {
127 return futures.get(channelId);
128 }
129
130 public ChannelFuture find(Channel channel) {
131 return futures.get(channel.getId());
132 }
133
134 public Iterator<ChannelFuture> iterator() {
135 return futures.values().iterator();
136 }
137
138 public synchronized boolean isDone() {
139 return done;
140 }
141
142 public synchronized boolean isCompleteSuccess() {
143 return successCount == futures.size();
144 }
145
146 public synchronized boolean isPartialSuccess() {
147 return !futures.isEmpty() && successCount != 0;
148 }
149
150 public synchronized boolean isPartialFailure() {
151 return !futures.isEmpty() && failureCount != 0;
152 }
153
154 public synchronized boolean isCompleteFailure() {
155 return failureCount == futures.size();
156 }
157
158 public void addListener(ChannelGroupFutureListener listener) {
159 if (listener == null) {
160 throw new NullPointerException("listener");
161 }
162
163 boolean notifyNow = false;
164 synchronized (this) {
165 if (done) {
166 notifyNow = true;
167 } else {
168 if (firstListener == null) {
169 firstListener = listener;
170 } else {
171 if (otherListeners == null) {
172 otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
173 }
174 otherListeners.add(listener);
175 }
176 }
177 }
178
179 if (notifyNow) {
180 notifyListener(listener);
181 }
182 }
183
184 public void removeListener(ChannelGroupFutureListener listener) {
185 if (listener == null) {
186 throw new NullPointerException("listener");
187 }
188
189 synchronized (this) {
190 if (!done) {
191 if (listener == firstListener) {
192 if (otherListeners != null && !otherListeners.isEmpty()) {
193 firstListener = otherListeners.remove(0);
194 } else {
195 firstListener = null;
196 }
197 } else if (otherListeners != null) {
198 otherListeners.remove(listener);
199 }
200 }
201 }
202 }
203
204 public ChannelGroupFuture await() throws InterruptedException {
205 if (Thread.interrupted()) {
206 throw new InterruptedException();
207 }
208
209 synchronized (this) {
210 while (!done) {
211 checkDeadLock();
212 waiters++;
213 try {
214 this.wait();
215 } finally {
216 waiters--;
217 }
218 }
219 }
220 return this;
221 }
222
223 public boolean await(long timeout, TimeUnit unit)
224 throws InterruptedException {
225 return await0(unit.toNanos(timeout), true);
226 }
227
228 public boolean await(long timeoutMillis) throws InterruptedException {
229 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
230 }
231
232 public ChannelGroupFuture awaitUninterruptibly() {
233 boolean interrupted = false;
234 synchronized (this) {
235 while (!done) {
236 checkDeadLock();
237 waiters++;
238 try {
239 this.wait();
240 } catch (InterruptedException e) {
241 interrupted = true;
242 } finally {
243 waiters--;
244 }
245 }
246 }
247
248 if (interrupted) {
249 Thread.currentThread().interrupt();
250 }
251
252 return this;
253 }
254
255 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
256 try {
257 return await0(unit.toNanos(timeout), false);
258 } catch (InterruptedException e) {
259 throw new InternalError();
260 }
261 }
262
263 public boolean awaitUninterruptibly(long timeoutMillis) {
264 try {
265 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
266 } catch (InterruptedException e) {
267 throw new InternalError();
268 }
269 }
270
271 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
272 if (interruptable && Thread.interrupted()) {
273 throw new InterruptedException();
274 }
275
276 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
277 long waitTime = timeoutNanos;
278 boolean interrupted = false;
279
280 try {
281 synchronized (this) {
282 if (done) {
283 return done;
284 } else if (waitTime <= 0) {
285 return done;
286 }
287
288 checkDeadLock();
289 waiters++;
290 try {
291 for (;;) {
292 try {
293 this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
294 } catch (InterruptedException e) {
295 if (interruptable) {
296 throw e;
297 } else {
298 interrupted = true;
299 }
300 }
301
302 if (done) {
303 return true;
304 } else {
305 waitTime = timeoutNanos - (System.nanoTime() - startTime);
306 if (waitTime <= 0) {
307 return done;
308 }
309 }
310 }
311 } finally {
312 waiters--;
313 }
314 }
315 } finally {
316 if (interrupted) {
317 Thread.currentThread().interrupt();
318 }
319 }
320 }
321
322 private void checkDeadLock() {
323 if (IoWorkerRunnable.IN_IO_THREAD.get()) {
324 throw new IllegalStateException(
325 "await*() in I/O thread causes a dead lock or " +
326 "sudden performance drop. Use addListener() instead or " +
327 "call await*() from a different thread.");
328 }
329 }
330
331 boolean setDone() {
332 synchronized (this) {
333
334 if (done) {
335 return false;
336 }
337
338 done = true;
339 if (waiters > 0) {
340 notifyAll();
341 }
342 }
343
344 notifyListeners();
345 return true;
346 }
347
348 private void notifyListeners() {
349
350
351
352
353
354 if (firstListener != null) {
355 notifyListener(firstListener);
356 firstListener = null;
357
358 if (otherListeners != null) {
359 for (ChannelGroupFutureListener l: otherListeners) {
360 notifyListener(l);
361 }
362 otherListeners = null;
363 }
364 }
365 }
366
367 private void notifyListener(ChannelGroupFutureListener l) {
368 try {
369 l.operationComplete(this);
370 } catch (Throwable t) {
371 logger.warn(
372 "An exception was thrown by " +
373 ChannelFutureListener.class.getSimpleName() + ".", t);
374 }
375 }
376 }