1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.util;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.locks.ReadWriteLock;
29 import java.util.concurrent.locks.ReentrantReadWriteLock;
30
31 import org.jboss.netty.channel.ChannelPipelineFactory;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34 import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
35 import org.jboss.netty.util.internal.DetectionUtil;
36 import org.jboss.netty.util.internal.ReusableIterator;
37 import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public class HashedWheelTimer implements Timer {
83
84 static final InternalLogger logger =
85 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
86 private static final AtomicInteger id = new AtomicInteger();
87
88 private static final SharedResourceMisuseDetector misuseDetector =
89 new SharedResourceMisuseDetector(HashedWheelTimer.class);
90
91 private final Worker worker = new Worker();
92 final Thread workerThread;
93 final AtomicBoolean shutdown = new AtomicBoolean();
94
95 private final long roundDuration;
96 final long tickDuration;
97 final Set<HashedWheelTimeout>[] wheel;
98 final ReusableIterator<HashedWheelTimeout>[] iterators;
99 final int mask;
100 final ReadWriteLock lock = new ReentrantReadWriteLock();
101 volatile int wheelCursor;
102
103
104
105
106
107
108 public HashedWheelTimer() {
109 this(Executors.defaultThreadFactory());
110 }
111
112
113
114
115
116
117
118
119
120 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
121 this(Executors.defaultThreadFactory(), tickDuration, unit);
122 }
123
124
125
126
127
128
129
130
131
132 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
133 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
134 }
135
136
137
138
139
140
141
142
143
144 public HashedWheelTimer(ThreadFactory threadFactory) {
145 this(threadFactory, 100, TimeUnit.MILLISECONDS);
146 }
147
148
149
150
151
152
153
154
155
156
157 public HashedWheelTimer(
158 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
159 this(threadFactory, tickDuration, unit, 512);
160 }
161
162
163
164
165
166
167
168
169
170
171
172 public HashedWheelTimer(
173 ThreadFactory threadFactory,
174 long tickDuration, TimeUnit unit, int ticksPerWheel) {
175
176 if (threadFactory == null) {
177 throw new NullPointerException("threadFactory");
178 }
179 if (unit == null) {
180 throw new NullPointerException("unit");
181 }
182 if (tickDuration <= 0) {
183 throw new IllegalArgumentException(
184 "tickDuration must be greater than 0: " + tickDuration);
185 }
186 if (ticksPerWheel <= 0) {
187 throw new IllegalArgumentException(
188 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
189 }
190
191
192 wheel = createWheel(ticksPerWheel);
193 iterators = createIterators(wheel);
194 mask = wheel.length - 1;
195
196
197 this.tickDuration = tickDuration = unit.toMillis(tickDuration);
198
199
200 if (tickDuration == Long.MAX_VALUE ||
201 tickDuration >= Long.MAX_VALUE / wheel.length) {
202 throw new IllegalArgumentException(
203 "tickDuration is too long: " +
204 tickDuration + ' ' + unit);
205 }
206
207 roundDuration = tickDuration * wheel.length;
208
209 workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
210 worker, "Hashed wheel timer #" + id.incrementAndGet()));
211
212
213 misuseDetector.increase();
214 }
215
216 @SuppressWarnings("unchecked")
217 private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
218 if (ticksPerWheel <= 0) {
219 throw new IllegalArgumentException(
220 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
221 }
222 if (ticksPerWheel > 1073741824) {
223 throw new IllegalArgumentException(
224 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
225 }
226
227 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
228 Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
229 for (int i = 0; i < wheel.length; i ++) {
230 wheel[i] = new MapBackedSet<HashedWheelTimeout>(
231 new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
232 }
233 return wheel;
234 }
235
236 @SuppressWarnings("unchecked")
237 private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
238 ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
239 for (int i = 0; i < wheel.length; i ++) {
240 iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
241 }
242 return iterators;
243 }
244
245 private static int normalizeTicksPerWheel(int ticksPerWheel) {
246 int normalizedTicksPerWheel = 1;
247 while (normalizedTicksPerWheel < ticksPerWheel) {
248 normalizedTicksPerWheel <<= 1;
249 }
250 return normalizedTicksPerWheel;
251 }
252
253
254
255
256
257
258
259
260 public synchronized void start() {
261 if (shutdown.get()) {
262 throw new IllegalStateException("cannot be started once stopped");
263 }
264
265 if (!workerThread.isAlive()) {
266 workerThread.start();
267 }
268 }
269
270 public synchronized Set<Timeout> stop() {
271 if (Thread.currentThread() == workerThread) {
272 throw new IllegalStateException(
273 HashedWheelTimer.class.getSimpleName() +
274 ".stop() cannot be called from " +
275 TimerTask.class.getSimpleName());
276 }
277
278 if (!shutdown.compareAndSet(false, true)) {
279 return Collections.emptySet();
280 }
281
282 boolean interrupted = false;
283 while (workerThread.isAlive()) {
284 workerThread.interrupt();
285 try {
286 workerThread.join(100);
287 } catch (InterruptedException e) {
288 interrupted = true;
289 }
290 }
291
292 if (interrupted) {
293 Thread.currentThread().interrupt();
294 }
295
296 misuseDetector.decrease();
297
298 Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
299 for (Set<HashedWheelTimeout> bucket: wheel) {
300 unprocessedTimeouts.addAll(bucket);
301 bucket.clear();
302 }
303
304 return Collections.unmodifiableSet(unprocessedTimeouts);
305 }
306
307 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
308 final long currentTime = System.currentTimeMillis();
309
310 if (task == null) {
311 throw new NullPointerException("task");
312 }
313 if (unit == null) {
314 throw new NullPointerException("unit");
315 }
316
317 if (!workerThread.isAlive()) {
318 start();
319 }
320
321 delay = unit.toMillis(delay);
322 HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
323 scheduleTimeout(timeout, delay);
324 return timeout;
325 }
326
327 void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
328
329
330 if (delay < tickDuration) {
331 delay = tickDuration;
332 }
333
334
335 final long lastRoundDelay = delay % roundDuration;
336 final long lastTickDelay = delay % tickDuration;
337 final long relativeIndex =
338 lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
339
340 final long remainingRounds =
341 delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
342
343
344 lock.readLock().lock();
345 try {
346 int stopIndex = (int) (wheelCursor + relativeIndex & mask);
347 timeout.stopIndex = stopIndex;
348 timeout.remainingRounds = remainingRounds;
349
350 wheel[stopIndex].add(timeout);
351 } finally {
352 lock.readLock().unlock();
353 }
354 }
355
356 private final class Worker implements Runnable {
357
358 private long startTime;
359 private long tick;
360
361 Worker() {
362 super();
363 }
364
365 public void run() {
366 List<HashedWheelTimeout> expiredTimeouts =
367 new ArrayList<HashedWheelTimeout>();
368
369 startTime = System.currentTimeMillis();
370 tick = 1;
371
372 while (!shutdown.get()) {
373 final long deadline = waitForNextTick();
374 if (deadline > 0) {
375 fetchExpiredTimeouts(expiredTimeouts, deadline);
376 notifyExpiredTimeouts(expiredTimeouts);
377 }
378 }
379 }
380
381 private void fetchExpiredTimeouts(
382 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
383
384
385
386
387
388 lock.writeLock().lock();
389 try {
390 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
391 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
392 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
393 } finally {
394 lock.writeLock().unlock();
395 }
396 }
397
398 private void fetchExpiredTimeouts(
399 List<HashedWheelTimeout> expiredTimeouts,
400 ReusableIterator<HashedWheelTimeout> i, long deadline) {
401
402 List<HashedWheelTimeout> slipped = null;
403 i.rewind();
404 while (i.hasNext()) {
405 HashedWheelTimeout timeout = i.next();
406 if (timeout.remainingRounds <= 0) {
407 i.remove();
408 if (timeout.deadline <= deadline) {
409 expiredTimeouts.add(timeout);
410 } else {
411
412
413
414
415 if (slipped == null) {
416 slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
417 }
418 slipped.add(timeout);
419 }
420 } else {
421 timeout.remainingRounds --;
422 }
423 }
424
425
426 if (slipped != null) {
427 for (HashedWheelTimeout timeout: slipped) {
428 scheduleTimeout(timeout, timeout.deadline - deadline);
429 }
430 }
431 }
432
433 private void notifyExpiredTimeouts(
434 List<HashedWheelTimeout> expiredTimeouts) {
435
436 for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
437 expiredTimeouts.get(i).expire();
438 }
439
440
441 expiredTimeouts.clear();
442 }
443
444 private long waitForNextTick() {
445 long deadline = startTime + tickDuration * tick;
446
447 for (;;) {
448 final long currentTime = System.currentTimeMillis();
449 long sleepTime = tickDuration * tick - (currentTime - startTime);
450
451
452
453
454
455
456 if (DetectionUtil.isWindows()) {
457 sleepTime = sleepTime / 10 * 10;
458 }
459
460 if (sleepTime <= 0) {
461 break;
462 }
463 try {
464 Thread.sleep(sleepTime);
465 } catch (InterruptedException e) {
466 if (shutdown.get()) {
467 return -1;
468 }
469 }
470 }
471
472
473 tick ++;
474 return deadline;
475 }
476 }
477
478 private final class HashedWheelTimeout implements Timeout {
479
480 private static final int ST_INIT = 0;
481 private static final int ST_CANCELLED = 1;
482 private static final int ST_EXPIRED = 2;
483
484 private final TimerTask task;
485 final long deadline;
486 volatile int stopIndex;
487 volatile long remainingRounds;
488 private final AtomicInteger state = new AtomicInteger(ST_INIT);
489
490 HashedWheelTimeout(TimerTask task, long deadline) {
491 this.task = task;
492 this.deadline = deadline;
493 }
494
495 public Timer getTimer() {
496 return HashedWheelTimer.this;
497 }
498
499 public TimerTask getTask() {
500 return task;
501 }
502
503 public void cancel() {
504 if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
505
506 return;
507 }
508
509 wheel[stopIndex].remove(this);
510 }
511
512 public boolean isCancelled() {
513 return state.get() == ST_CANCELLED;
514 }
515
516 public boolean isExpired() {
517 return state.get() != ST_INIT;
518 }
519
520 public void expire() {
521 if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
522 return;
523 }
524
525 try {
526 task.run(this);
527 } catch (Throwable t) {
528 if (logger.isWarnEnabled()) {
529 logger.warn(
530 "An exception was thrown by " +
531 TimerTask.class.getSimpleName() + ".", t);
532 }
533
534 }
535 }
536
537 @Override
538 public String toString() {
539 long currentTime = System.currentTimeMillis();
540 long remaining = deadline - currentTime;
541
542 StringBuilder buf = new StringBuilder(192);
543 buf.append(getClass().getSimpleName());
544 buf.append('(');
545
546 buf.append("deadline: ");
547 if (remaining > 0) {
548 buf.append(remaining);
549 buf.append(" ms later, ");
550 } else if (remaining < 0) {
551 buf.append(-remaining);
552 buf.append(" ms ago, ");
553 } else {
554 buf.append("now, ");
555 }
556
557 if (isCancelled()) {
558 buf.append(", cancelled");
559 }
560
561 return buf.append(')').toString();
562 }
563 }
564 }