1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.thread;
16
17 import java.io.Serializable;
18 import java.util.ArrayList;
19 import java.util.HashSet;
20 import java.util.Iterator;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Set;
24
25 import org.mortbay.component.AbstractLifeCycle;
26 import org.mortbay.log.Log;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 public class BoundedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
43 {
44 private static int __id;
45 private boolean _daemon;
46 private int _id;
47 private List _idle;
48
49 private final Object _lock = new Object();
50 private final Object _joinLock = new Object();
51
52 private long _lastShrink;
53 private int _maxIdleTimeMs=60000;
54 private int _maxThreads=255;
55 private int _minThreads=1;
56 private String _name;
57 private List _queue;
58 private Set _threads;
59 private boolean _warned=false;
60 int _lowThreads=0;
61 int _priority= Thread.NORM_PRIORITY;
62
63
64
65
66 public BoundedThreadPool()
67 {
68 _name="btpool"+__id++;
69 }
70
71
72
73
74
75 public boolean dispatch(Runnable job)
76 {
77 synchronized(_lock)
78 {
79 if (!isRunning() || job==null)
80 return false;
81
82
83 int idle=_idle.size();
84 if (idle>0)
85 {
86 PoolThread thread=(PoolThread)_idle.remove(idle-1);
87 thread.dispatch(job);
88 }
89 else
90 {
91
92 if (_threads.size()<_maxThreads)
93 {
94
95 newThread(job);
96 }
97 else
98 {
99 if (!_warned)
100 {
101 _warned=true;
102 Log.debug("Out of threads for {}",this);
103 }
104 _queue.add(job);
105 }
106 }
107 }
108
109 return true;
110 }
111
112
113
114
115
116
117 public int getIdleThreads()
118 {
119 return _idle==null?0:_idle.size();
120 }
121
122
123
124
125
126 public int getLowThreads()
127 {
128 return _lowThreads;
129 }
130
131
132
133
134
135
136
137
138 public int getMaxIdleTimeMs()
139 {
140 return _maxIdleTimeMs;
141 }
142
143
144
145
146
147
148
149 public int getMaxThreads()
150 {
151 return _maxThreads;
152 }
153
154
155
156
157
158
159
160 public int getMinThreads()
161 {
162 return _minThreads;
163 }
164
165
166
167
168
169 public String getName()
170 {
171 return _name;
172 }
173
174
175
176
177
178
179 public int getThreads()
180 {
181 return _threads.size();
182 }
183
184
185
186
187
188 public int getThreadsPriority()
189 {
190 return _priority;
191 }
192
193
194 public int getQueueSize()
195 {
196 synchronized(_lock)
197 {
198 return _queue.size();
199 }
200 }
201
202
203
204
205
206 public boolean isDaemon()
207 {
208 return _daemon;
209 }
210
211
212 public boolean isLowOnThreads()
213 {
214 synchronized(_lock)
215 {
216
217 return _queue.size()>_lowThreads;
218 }
219 }
220
221
222 public void join() throws InterruptedException
223 {
224 synchronized (_joinLock)
225 {
226 while (isRunning())
227 _joinLock.wait();
228 }
229
230
231 while (isStopping())
232 Thread.sleep(10);
233 }
234
235
236
237
238
239 public void setDaemon(boolean daemon)
240 {
241 _daemon=daemon;
242 }
243
244
245
246
247
248 public void setLowThreads(int lowThreads)
249 {
250 _lowThreads = lowThreads;
251 }
252
253
254
255
256
257
258
259
260
261 public void setMaxIdleTimeMs(int maxIdleTimeMs)
262 {
263 _maxIdleTimeMs=maxIdleTimeMs;
264 }
265
266
267
268
269
270
271
272 public void setMaxThreads(int maxThreads)
273 {
274 if (isStarted() && maxThreads<_minThreads)
275 throw new IllegalArgumentException("!minThreads<maxThreads");
276 _maxThreads=maxThreads;
277 }
278
279
280
281
282
283
284
285 public void setMinThreads(int minThreads)
286 {
287 if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
288 throw new IllegalArgumentException("!0<=minThreads<maxThreads");
289 _minThreads=minThreads;
290 synchronized (_lock)
291 {
292 while (isStarted() && _threads.size()<_minThreads)
293 {
294 newThread(null);
295 }
296 }
297 }
298
299
300
301
302
303 public void setName(String name)
304 {
305 _name= name;
306 }
307
308
309
310
311
312 public void setThreadsPriority(int priority)
313 {
314 _priority=priority;
315 }
316
317
318
319
320
321 protected void doStart() throws Exception
322 {
323 if (_maxThreads<_minThreads || _minThreads<=0)
324 throw new IllegalArgumentException("!0<minThreads<maxThreads");
325
326 _threads=new HashSet();
327 _idle=new ArrayList();
328 _queue=new LinkedList();
329
330 for (int i=0;i<_minThreads;i++)
331 {
332 newThread(null);
333 }
334 }
335
336
337
338
339
340
341
342
343
344 protected void doStop() throws Exception
345 {
346 super.doStop();
347
348 for (int i=0;i<100;i++)
349 {
350 synchronized (_lock)
351 {
352 Iterator iter = _threads.iterator();
353 while (iter.hasNext())
354 ((Thread)iter.next()).interrupt();
355 }
356
357 Thread.yield();
358 if (_threads.size()==0)
359 break;
360
361 try
362 {
363 Thread.sleep(i*100);
364 }
365 catch(InterruptedException e){}
366 }
367
368
369 if (_threads.size()>0)
370 Log.warn(_threads.size()+" threads could not be stopped");
371
372 synchronized (_joinLock)
373 {
374 _joinLock.notifyAll();
375 }
376 }
377
378
379 protected PoolThread newThread(Runnable job)
380 {
381 synchronized(_lock)
382 {
383 PoolThread thread =new PoolThread(job);
384 _threads.add(thread);
385 thread.setName(_name+"-"+_id++);
386 thread.start();
387 return thread;
388 }
389 }
390
391
392
393
394
395
396
397
398
399 protected void stopJob(Thread thread, Object job)
400 {
401 thread.interrupt();
402 }
403
404
405
406
407
408
409
410 public class PoolThread extends Thread
411 {
412 Runnable _job=null;
413
414
415 PoolThread()
416 {
417 setDaemon(_daemon);
418 setPriority(_priority);
419 }
420
421
422 PoolThread(Runnable job)
423 {
424 setDaemon(_daemon);
425 setPriority(_priority);
426 _job=job;
427 }
428
429
430
431
432
433 public void run()
434 {
435 try
436 {
437 Runnable job=null;
438
439 synchronized (this)
440 {
441 job=_job;
442 _job=null;
443 }
444
445 while (isRunning())
446 {
447 if (job!=null)
448 {
449 Runnable todo=job;
450 job=null;
451 todo.run();
452 }
453 else
454 {
455
456 synchronized (_lock)
457 {
458
459 if (_queue.size()>0)
460 {
461 job=(Runnable)_queue.remove(0);
462 continue;
463 }
464 else
465 {
466 _warned=false;
467
468
469 if (_threads.size()>_maxThreads ||
470 _idle.size()>0 &&
471 _threads.size()>_minThreads)
472 {
473 long now = System.currentTimeMillis();
474 if ((now-_lastShrink)>getMaxIdleTimeMs())
475 {
476 _lastShrink=now;
477 return;
478 }
479 }
480 }
481
482
483 _idle.add(this);
484 }
485
486 try
487 {
488 synchronized (this)
489 {
490 if (_job==null)
491 this.wait(getMaxIdleTimeMs());
492 job=_job;
493 _job=null;
494 }
495 }
496 catch (InterruptedException e)
497 {
498 Log.ignore(e);
499 }
500 finally
501 {
502 synchronized (_lock)
503 {
504 _idle.remove(this);
505 }
506 }
507 }
508 }
509 }
510 finally
511 {
512 synchronized (_lock)
513 {
514 _threads.remove(this);
515 }
516
517 Runnable job=null;
518 synchronized (this)
519 {
520 job=_job;
521 }
522 if (job!=null && isRunning())
523 BoundedThreadPool.this.dispatch(job);
524 }
525 }
526
527
528 void dispatch(Runnable job)
529 {
530 synchronized (this)
531 {
532 if(_job!=null || job==null)
533 throw new IllegalStateException();
534 _job=job;
535 this.notify();
536 }
537 }
538 }
539
540 }