View Javadoc

1   // ========================================================================
2   // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.thread;
16  
17  import java.io.Serializable;
18  import java.util.ArrayList;
19  import java.util.HashSet;
20  import java.util.Iterator;
21  import java.util.LinkedList;
22  import java.util.List;
23  import java.util.Set;
24  
25  import org.mortbay.component.AbstractLifeCycle;
26  import org.mortbay.log.Log;
27  
28  /* ------------------------------------------------------------ */
29  /** A pool of threads.
30   * <p>
31   * Avoids the expense of thread creation by pooling threads after
32   * their run methods exit for reuse.
33   * <p>
34   * If the maximum pool size is reached, jobs wait for a free thread.
35   * By default there is no maximum pool size.  Idle threads timeout
36   * and terminate until the minimum number of threads are running.
37   * <p>
38   * @deprecated Use {@link QueuedThreadPool}
39   * @author Greg Wilkins <gregw@mortbay.com>
40   * @author Juancarlo Anez <juancarlo@modelistica.com>
41   */
42  public class BoundedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
43  {
44      private static int __id;
45      private boolean _daemon;
46      private int _id;
47      private List _idle;
48  
49      private final Object _lock = new Object();
50      private final Object _joinLock = new Object();
51  
52      private long _lastShrink;
53      private int _maxIdleTimeMs=60000;
54      private int _maxThreads=255;
55      private int _minThreads=1;
56      private String _name;
57      private List _queue;
58      private Set _threads;
59      private boolean _warned=false;
60      int _lowThreads=0;
61      int _priority= Thread.NORM_PRIORITY;
62  
63      /* ------------------------------------------------------------------- */
64      /* Construct
65       */
66      public BoundedThreadPool()
67      {
68          _name="btpool"+__id++;
69      }
70  
71      /* ------------------------------------------------------------ */
72      /** Run job.
73       * @return true 
74       */
75      public boolean dispatch(Runnable job) 
76      {
77          synchronized(_lock)
78          {       
79              if (!isRunning() || job==null)
80                  return false;
81  
82              // Are there no threads available?
83              int idle=_idle.size();
84              if (idle>0)
85              {
86                  PoolThread thread=(PoolThread)_idle.remove(idle-1);
87                  thread.dispatch(job);
88              }
89              else
90              {
91                  // Are we at max size?
92                  if (_threads.size()<_maxThreads)
93                  {    
94                      // No - create a new thread!
95                      newThread(job);
96                  }
97                  else 
98                  {
99                      if (!_warned)    
100                     {
101                         _warned=true;
102                         Log.debug("Out of threads for {}",this);
103                     }
104                     _queue.add(job);
105                 }
106             }
107         }
108 
109         return true;
110     }
111 
112     /* ------------------------------------------------------------ */
113     /** Get the number of idle threads in the pool.
114      * @see #getThreads
115      * @return Number of threads
116      */
117     public int getIdleThreads()
118     {
119         return _idle==null?0:_idle.size();
120     }
121     
122     /* ------------------------------------------------------------ */
123     /**
124      * @return low resource threads threshhold
125      */
126     public int getLowThreads()
127     {
128         return _lowThreads;
129     }
130 
131     
132     /* ------------------------------------------------------------ */
133     /** Get the maximum thread idle time.
134      * Delegated to the named or anonymous Pool.
135      * @see #setMaxIdleTimeMs
136      * @return Max idle time in ms.
137      */
138     public int getMaxIdleTimeMs()
139     {
140         return _maxIdleTimeMs;
141     }
142     
143     /* ------------------------------------------------------------ */
144     /** Set the maximum number of threads.
145      * Delegated to the named or anonymous Pool.
146      * @see #setMaxThreads
147      * @return maximum number of threads.
148      */
149     public int getMaxThreads()
150     {
151         return _maxThreads;
152     }
153 
154     /* ------------------------------------------------------------ */
155     /** Get the minimum number of threads.
156      * Delegated to the named or anonymous Pool.
157      * @see #setMinThreads
158      * @return minimum number of threads.
159      */
160     public int getMinThreads()
161     {
162         return _minThreads;
163     }
164 
165     /* ------------------------------------------------------------ */
166     /** 
167      * @return The name of the BoundedThreadPool.
168      */
169     public String getName()
170     {
171         return _name;
172     }
173 
174     /* ------------------------------------------------------------ */
175     /** Get the number of threads in the pool.
176      * @see #getIdleThreads
177      * @return Number of threads
178      */
179     public int getThreads()
180     {
181         return _threads.size();
182     }
183 
184     /* ------------------------------------------------------------ */
185     /** Get the priority of the pool threads.
186      *  @return the priority of the pool threads.
187      */
188     public int getThreadsPriority()
189     {
190         return _priority;
191     }
192 
193     /* ------------------------------------------------------------ */
194     public int getQueueSize()
195     {
196         synchronized(_lock)
197         {
198             return _queue.size();
199         }
200     }
201 
202     /* ------------------------------------------------------------ */
203     /** 
204      * Delegated to the named or anonymous Pool.
205      */
206     public boolean isDaemon()
207     {
208         return _daemon;
209     }
210 
211     /* ------------------------------------------------------------ */
212     public boolean isLowOnThreads()
213     {
214         synchronized(_lock)
215         {
216             // maybe make this volatile?
217             return _queue.size()>_lowThreads;
218         }
219     }
220 
221     /* ------------------------------------------------------------ */
222     public void join() throws InterruptedException
223     {
224         synchronized (_joinLock)
225         {
226             while (isRunning())
227                 _joinLock.wait();
228         }
229         
230         // TODO remove this semi busy loop!
231         while (isStopping())
232             Thread.sleep(10);
233     }
234 
235     /* ------------------------------------------------------------ */
236     /** 
237      * Delegated to the named or anonymous Pool.
238      */
239     public void setDaemon(boolean daemon)
240     {
241         _daemon=daemon;
242     }
243 
244     /* ------------------------------------------------------------ */
245     /**
246      * @param lowThreads low resource threads threshhold
247      */
248     public void setLowThreads(int lowThreads)
249     {
250         _lowThreads = lowThreads;
251     }
252     
253     /* ------------------------------------------------------------ */
254     /** Set the maximum thread idle time.
255      * Threads that are idle for longer than this period may be
256      * stopped.
257      * Delegated to the named or anonymous Pool.
258      * @see #getMaxIdleTimeMs
259      * @param maxIdleTimeMs Max idle time in ms.
260      */
261     public void setMaxIdleTimeMs(int maxIdleTimeMs)
262     {
263         _maxIdleTimeMs=maxIdleTimeMs;
264     }
265 
266     /* ------------------------------------------------------------ */
267     /** Set the maximum number of threads.
268      * Delegated to the named or anonymous Pool.
269      * @see #getMaxThreads
270      * @param maxThreads maximum number of threads.
271      */
272     public void setMaxThreads(int maxThreads)
273     {
274         if (isStarted() && maxThreads<_minThreads)
275             throw new IllegalArgumentException("!minThreads<maxThreads");
276         _maxThreads=maxThreads;
277     }
278 
279     /* ------------------------------------------------------------ */
280     /** Set the minimum number of threads.
281      * Delegated to the named or anonymous Pool.
282      * @see #getMinThreads
283      * @param minThreads minimum number of threads
284      */
285     public void setMinThreads(int minThreads)
286     {
287         if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
288             throw new IllegalArgumentException("!0<=minThreads<maxThreads");
289         _minThreads=minThreads;
290         synchronized (_lock)
291         {
292             while (isStarted() && _threads.size()<_minThreads)
293             {
294                 newThread(null);   
295             }
296         }
297     }
298 
299     /* ------------------------------------------------------------ */
300     /** 
301      * @param name Name of the BoundedThreadPool to use when naming Threads.
302      */
303     public void setName(String name)
304     {
305         _name= name;
306     }
307 
308     /* ------------------------------------------------------------ */
309     /** Set the priority of the pool threads.
310      *  @param priority the new thread priority.
311      */
312     public void setThreadsPriority(int priority)
313     {
314         _priority=priority;
315     }
316 
317     /* ------------------------------------------------------------ */
318     /* Start the BoundedThreadPool.
319      * Construct the minimum number of threads.
320      */
321     protected void doStart() throws Exception
322     {
323         if (_maxThreads<_minThreads || _minThreads<=0)
324             throw new IllegalArgumentException("!0<minThreads<maxThreads");
325         
326         _threads=new HashSet();
327         _idle=new ArrayList();
328         _queue=new LinkedList();
329         
330         for (int i=0;i<_minThreads;i++)
331         {
332             newThread(null);
333         }   
334     }
335 
336     /* ------------------------------------------------------------ */
337     /** Stop the BoundedThreadPool.
338      * New jobs are no longer accepted,idle threads are interrupted
339      * and stopJob is called on active threads.
340      * The method then waits 
341      * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
342      * stop, at which time killJob is called.
343      */
344     protected void doStop() throws Exception
345     {   
346         super.doStop();
347         
348         for (int i=0;i<100;i++)
349         {
350             synchronized (_lock)
351             {
352                 Iterator iter = _threads.iterator();
353                 while (iter.hasNext())
354                     ((Thread)iter.next()).interrupt();
355             }
356             
357             Thread.yield();
358             if (_threads.size()==0)
359                break;
360             
361             try
362             {
363                 Thread.sleep(i*100);
364             }
365             catch(InterruptedException e){}
366         }
367 
368         // TODO perhaps force stops
369         if (_threads.size()>0)
370             Log.warn(_threads.size()+" threads could not be stopped");
371         
372         synchronized (_joinLock)
373         {
374             _joinLock.notifyAll();
375         }
376     }
377 
378     /* ------------------------------------------------------------ */
379     protected PoolThread newThread(Runnable job)
380     {
381         synchronized(_lock)
382         {
383             PoolThread thread =new PoolThread(job);
384             _threads.add(thread);
385             thread.setName(_name+"-"+_id++);
386             thread.start();  
387             return thread;
388         }
389     }
390 
391     /* ------------------------------------------------------------ */
392     /** Stop a Job.
393      * This method is called by the Pool if a job needs to be stopped.
394      * The default implementation does nothing and should be extended by a
395      * derived thread pool class if special action is required.
396      * @param thread The thread allocated to the job, or null if no thread allocated.
397      * @param job The job object passed to run.
398      */
399     protected void stopJob(Thread thread, Object job)
400     {
401         thread.interrupt();
402     }
403     
404 
405     /* ------------------------------------------------------------ */
406     /** Pool Thread class.
407      * The PoolThread allows the threads job to be
408      * retrieved and active status to be indicated.
409      */
410     public class PoolThread extends Thread 
411     {
412         Runnable _job=null;
413 
414         /* ------------------------------------------------------------ */
415         PoolThread()
416         {
417             setDaemon(_daemon);
418             setPriority(_priority);
419         }
420 
421         /* ------------------------------------------------------------ */
422         PoolThread(Runnable job)
423         {
424             setDaemon(_daemon);
425             setPriority(_priority);
426             _job=job;
427         }
428 
429         /* ------------------------------------------------------------ */
430         /** BoundedThreadPool run.
431          * Loop getting jobs and handling them until idle or stopped.
432          */
433         public void run()
434         {
435             try
436             {
437                 Runnable job=null;
438 
439                 synchronized (this)
440                 {
441                     job=_job;
442                     _job=null;
443                 }
444                 
445                 while (isRunning())
446                 {
447                     if (job!=null)
448                     {
449                         Runnable todo=job;
450                         job=null;
451                         todo.run();
452                     }
453                     else
454                     {
455                         // No job
456                         synchronized (_lock)
457                         {
458                             // is there a queued job?
459                             if (_queue.size()>0)
460                             {
461                                 job=(Runnable)_queue.remove(0);
462                                 continue;
463                             }
464                             else
465                             {
466                                 _warned=false;
467                                 
468                                 // consider shrinking the thread pool
469                                 if (_threads.size()>_maxThreads ||     // we have too many threads  OR
470                                     _idle.size()>0 &&                  // are there idle threads?
471                                     _threads.size()>_minThreads)       // AND are there more than min threads?
472                                 {
473                                     long now = System.currentTimeMillis();
474                                     if ((now-_lastShrink)>getMaxIdleTimeMs())
475                                     {
476                                         _lastShrink=now;
477                                         return;
478                                     }
479                                 }
480                             }
481                                
482                             // we are going idle!
483                             _idle.add(this);
484                         }
485 
486                         try
487                         {
488                             synchronized (this)
489                             {
490                                 if (_job==null)
491                                     this.wait(getMaxIdleTimeMs());
492                                 job=_job;
493                                 _job=null;
494                             }
495                         }
496                         catch (InterruptedException e)
497                         {
498                             Log.ignore(e);
499                         }
500                         finally
501                         {
502                             synchronized (_lock)
503                             {
504                                 _idle.remove(this);
505                             }
506                         }
507                     }
508                 }
509             }
510             finally
511             {
512                 synchronized (_lock)
513                 {
514                     _threads.remove(this);
515                 }
516                 
517                 Runnable job=null;
518                 synchronized (this)
519                 {
520                     job=_job;
521                 }
522                 if (job!=null && isRunning())
523                     BoundedThreadPool.this.dispatch(job);
524             }
525         }
526         
527         /* ------------------------------------------------------------ */
528         void dispatch(Runnable job)
529         {
530             synchronized (this)
531             {
532                 if(_job!=null || job==null)
533                     throw new IllegalStateException();
534                 _job=job;
535                 this.notify();
536             }
537         }
538     }
539 
540 }