View Javadoc

1   // ========================================================================
2   // Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.thread;
16  
17  import java.io.Serializable;
18  import java.util.ArrayList;
19  import java.util.HashSet;
20  import java.util.Iterator;
21  import java.util.List;
22  import java.util.Set;
23  
24  import org.mortbay.component.AbstractLifeCycle;
25  import org.mortbay.log.Log;
26  
27  /* ------------------------------------------------------------ */
28  /** A pool of threads.
29   * <p>
30   * Avoids the expense of thread creation by pooling threads after
31   * their run methods exit for reuse.
32   * <p>
33   * If an idle thread is available a job is directly dispatched,
34   * otherwise the job is queued.  After queuing a job, if the total
35   * number of threads is less than the maximum pool size, a new thread 
36   * is spawned.
37   * <p>
38   *
39   * @author Greg Wilkins <gregw@mortbay.com>
40   */
41  public class QueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
42  {
43      private String _name;
44      private Set _threads;
45      private List _idle;
46      private Runnable[] _jobs;
47      private int _nextJob;
48      private int _nextJobSlot;
49      private int _queued;
50      private int _maxQueued;
51      
52      private boolean _daemon;
53      private int _id;
54  
55      private final Object _lock = new Lock();
56      private final Object _threadsLock = new Lock();
57      private final Object _joinLock = new Lock();
58  
59      private long _lastShrink;
60      private int _maxIdleTimeMs=60000;
61      private int _maxThreads=250;
62      private int _minThreads=2;
63      private boolean _warned=false;
64      private int _lowThreads=0;
65      private int _priority= Thread.NORM_PRIORITY;
66      private int _spawnOrShrinkAt=0;
67      private int _maxStopTimeMs;
68  
69      
70      /* ------------------------------------------------------------------- */
71      /* Construct
72       */
73      public QueuedThreadPool()
74      {
75          _name="qtp-"+hashCode();
76      }
77      
78      /* ------------------------------------------------------------------- */
79      /* Construct
80       */
81      public QueuedThreadPool(int maxThreads)
82      {
83          this();
84          setMaxThreads(maxThreads);
85      }
86  
87      /* ------------------------------------------------------------ */
88      /** Run job.
89       * @return true 
90       */
91      public boolean dispatch(Runnable job) 
92      {  
93          if (!isRunning() || job==null)
94              return false;
95  
96          PoolThread thread=null;
97          boolean spawn=false;
98              
99          synchronized(_lock)
100         {
101             // Look for an idle thread
102             int idle=_idle.size();
103             if (idle>0)
104                 thread=(PoolThread)_idle.remove(idle-1);
105             else
106             {
107                 // queue the job
108                 _queued++;
109                 if (_queued>_maxQueued)
110                     _maxQueued=_queued;
111                 _jobs[_nextJobSlot++]=job;
112                 if (_nextJobSlot==_jobs.length)
113                     _nextJobSlot=0;
114                 if (_nextJobSlot==_nextJob)
115                 {
116                     // Grow the job queue
117                     Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
118                     int split=_jobs.length-_nextJob;
119                     if (split>0)
120                         System.arraycopy(_jobs,_nextJob,jobs,0,split);
121                     if (_nextJob!=0)
122                         System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
123                     
124                     _jobs=jobs;
125                     _nextJob=0;
126                     _nextJobSlot=_queued;
127                 }
128                   
129                 spawn=_queued>_spawnOrShrinkAt;
130             }
131         }
132         
133         if (thread!=null)
134         {
135             thread.dispatch(job);
136         }
137         else if (spawn)
138         {
139             newThread();
140         }
141         return true;
142     }
143 
144     /* ------------------------------------------------------------ */
145     /** Get the number of idle threads in the pool.
146      * @see #getThreads
147      * @return Number of threads
148      */
149     public int getIdleThreads()
150     {
151         return _idle==null?0:_idle.size();
152     }
153     
154     /* ------------------------------------------------------------ */
155     /**
156      * @return low resource threads threshhold
157      */
158     public int getLowThreads()
159     {
160         return _lowThreads;
161     }
162     
163     /* ------------------------------------------------------------ */
164     /**
165      * @return maximum queue size
166      */
167     public int getMaxQueued()
168     {
169         return _maxQueued;
170     }
171     
172     /* ------------------------------------------------------------ */
173     /** Get the maximum thread idle time.
174      * Delegated to the named or anonymous Pool.
175      * @see #setMaxIdleTimeMs
176      * @return Max idle time in ms.
177      */
178     public int getMaxIdleTimeMs()
179     {
180         return _maxIdleTimeMs;
181     }
182     
183     /* ------------------------------------------------------------ */
184     /** Set the maximum number of threads.
185      * Delegated to the named or anonymous Pool.
186      * @see #setMaxThreads
187      * @return maximum number of threads.
188      */
189     public int getMaxThreads()
190     {
191         return _maxThreads;
192     }
193 
194     /* ------------------------------------------------------------ */
195     /** Get the minimum number of threads.
196      * Delegated to the named or anonymous Pool.
197      * @see #setMinThreads
198      * @return minimum number of threads.
199      */
200     public int getMinThreads()
201     {
202         return _minThreads;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /** 
207      * @return The name of the BoundedThreadPool.
208      */
209     public String getName()
210     {
211         return _name;
212     }
213 
214     /* ------------------------------------------------------------ */
215     /** Get the number of threads in the pool.
216      * @see #getIdleThreads
217      * @return Number of threads
218      */
219     public int getThreads()
220     {
221         return _threads.size();
222     }
223 
224     /* ------------------------------------------------------------ */
225     /** Get the priority of the pool threads.
226      *  @return the priority of the pool threads.
227      */
228     public int getThreadsPriority()
229     {
230         return _priority;
231     }
232 
233     /* ------------------------------------------------------------ */
234     public int getQueueSize()
235     {
236         return _queued;
237     }
238     
239     /* ------------------------------------------------------------ */
240     /**
241      * @return the spawnOrShrinkAt  The number of queued jobs (or idle threads) needed 
242      * before the thread pool is grown (or shrunk)
243      */
244     public int getSpawnOrShrinkAt()
245     {
246         return _spawnOrShrinkAt;
247     }
248 
249     /* ------------------------------------------------------------ */
250     /**
251      * @param spawnOrShrinkAt The number of queued jobs (or idle threads) needed 
252      * before the thread pool is grown (or shrunk)
253      */
254     public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
255     {
256         _spawnOrShrinkAt=spawnOrShrinkAt;
257     }
258 
259     /* ------------------------------------------------------------ */
260     /**
261      * @return maximum total time that stop() will wait for threads to die.
262      */
263     public int getMaxStopTimeMs()
264     {
265         return _maxStopTimeMs;
266     }
267 
268     /* ------------------------------------------------------------ */
269     /**
270      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
271      */
272     public void setMaxStopTimeMs(int stopTimeMs)
273     {
274         _maxStopTimeMs = stopTimeMs;
275     }
276 
277     /* ------------------------------------------------------------ */
278     /** 
279      * Delegated to the named or anonymous Pool.
280      */
281     public boolean isDaemon()
282     {
283         return _daemon;
284     }
285 
286     /* ------------------------------------------------------------ */
287     public boolean isLowOnThreads()
288     {
289         return _queued>_lowThreads;
290     }
291 
292     /* ------------------------------------------------------------ */
293     public void join() throws InterruptedException
294     {
295         synchronized (_joinLock)
296         {
297             while (isRunning())
298                 _joinLock.wait();
299         }
300         
301         // TODO remove this semi busy loop!
302         while (isStopping())
303             Thread.sleep(100);
304     }
305 
306     /* ------------------------------------------------------------ */
307     /** 
308      * Delegated to the named or anonymous Pool.
309      */
310     public void setDaemon(boolean daemon)
311     {
312         _daemon=daemon;
313     }
314 
315     /* ------------------------------------------------------------ */
316     /**
317      * @param lowThreads low resource threads threshhold
318      */
319     public void setLowThreads(int lowThreads)
320     {
321         _lowThreads = lowThreads;
322     }
323     
324     /* ------------------------------------------------------------ */
325     /** Set the maximum thread idle time.
326      * Threads that are idle for longer than this period may be
327      * stopped.
328      * Delegated to the named or anonymous Pool.
329      * @see #getMaxIdleTimeMs
330      * @param maxIdleTimeMs Max idle time in ms.
331      */
332     public void setMaxIdleTimeMs(int maxIdleTimeMs)
333     {
334         _maxIdleTimeMs=maxIdleTimeMs;
335     }
336 
337     /* ------------------------------------------------------------ */
338     /** Set the maximum number of threads.
339      * Delegated to the named or anonymous Pool.
340      * @see #getMaxThreads
341      * @param maxThreads maximum number of threads.
342      */
343     public void setMaxThreads(int maxThreads)
344     {
345         if (isStarted() && maxThreads<_minThreads)
346             throw new IllegalArgumentException("!minThreads<maxThreads");
347         _maxThreads=maxThreads;
348     }
349 
350     /* ------------------------------------------------------------ */
351     /** Set the minimum number of threads.
352      * Delegated to the named or anonymous Pool.
353      * @see #getMinThreads
354      * @param minThreads minimum number of threads
355      */
356     public void setMinThreads(int minThreads)
357     {
358         if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
359             throw new IllegalArgumentException("!0<=minThreads<maxThreads");
360         _minThreads=minThreads;
361         synchronized (_threadsLock)
362         {
363             while (isStarted() && _threads.size()<_minThreads)
364             {
365                 newThread();   
366             }
367         }
368     }
369 
370     /* ------------------------------------------------------------ */
371     /** 
372      * @param name Name of the BoundedThreadPool to use when naming Threads.
373      */
374     public void setName(String name)
375     {
376         _name= name;
377     }
378 
379     /* ------------------------------------------------------------ */
380     /** Set the priority of the pool threads.
381      *  @param priority the new thread priority.
382      */
383     public void setThreadsPriority(int priority)
384     {
385         _priority=priority;
386     }
387 
388     /* ------------------------------------------------------------ */
389     /* Start the BoundedThreadPool.
390      * Construct the minimum number of threads.
391      */
392     protected void doStart() throws Exception
393     {
394         if (_maxThreads<_minThreads || _minThreads<=0)
395             throw new IllegalArgumentException("!0<minThreads<maxThreads");
396         
397         _threads=new HashSet();
398         _idle=new ArrayList();
399         _jobs=new Runnable[_maxThreads];
400         
401         for (int i=0;i<_minThreads;i++)
402         {
403             newThread();
404         }   
405     }
406 
407     /* ------------------------------------------------------------ */
408     /** Stop the BoundedThreadPool.
409      * New jobs are no longer accepted,idle threads are interrupted
410      * and stopJob is called on active threads.
411      * The method then waits 
412      * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
413      * stop, at which time killJob is called.
414      */
415     protected void doStop() throws Exception
416     {   
417         super.doStop();
418         
419         long start=System.currentTimeMillis();
420         for (int i=0;i<100;i++)
421         {
422             synchronized (_threadsLock)
423             {
424                 Iterator iter = _threads.iterator();
425                 while (iter.hasNext())
426                     ((Thread)iter.next()).interrupt();
427             }
428             
429             Thread.yield();
430             if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
431                break;
432             
433             try
434             {
435                 Thread.sleep(i*100);
436             }
437             catch(InterruptedException e){}
438             
439             
440         }
441 
442         // TODO perhaps force stops
443         if (_threads.size()>0)
444             Log.warn(_threads.size()+" threads could not be stopped");
445         
446         synchronized (_joinLock)
447         {
448             _joinLock.notifyAll();
449         }
450     }
451 
452     /* ------------------------------------------------------------ */
453     protected void newThread()
454     {
455         synchronized (_threadsLock)
456         {
457             if (_threads.size()<_maxThreads)
458             {
459                 PoolThread thread =new PoolThread();
460                 _threads.add(thread);
461                 thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
462                 thread.start(); 
463             }
464             else if (!_warned)    
465             {
466                 _warned=true;
467                 Log.debug("Max threads for {}",this);
468             }
469         }
470     }
471 
472     /* ------------------------------------------------------------ */
473     /** Stop a Job.
474      * This method is called by the Pool if a job needs to be stopped.
475      * The default implementation does nothing and should be extended by a
476      * derived thread pool class if special action is required.
477      * @param thread The thread allocated to the job, or null if no thread allocated.
478      * @param job The job object passed to run.
479      */
480     protected void stopJob(Thread thread, Object job)
481     {
482         thread.interrupt();
483     }
484     
485 
486     /* ------------------------------------------------------------ */
487     public String dump()
488     {
489         StringBuffer buf = new StringBuffer();
490 
491         synchronized (_threadsLock)
492         {
493             for (Iterator i=_threads.iterator();i.hasNext();)
494             {
495                 Thread thread = (Thread)i.next();
496                 buf.append(thread.getName()).append(" ").append(thread.toString()).append('\n');
497             }
498         }
499         
500         return buf.toString();
501     }
502     
503     /* ------------------------------------------------------------ */
504     /**
505      * @param name The thread name to stop.
506      * @return true if the thread was found and stopped.
507      * @Deprecated Use {@link #interruptThread(long)} in preference
508      */
509     public boolean stopThread(String name)
510     {
511         synchronized (_threadsLock)
512         {
513             for (Iterator i=_threads.iterator();i.hasNext();)
514             {
515                 Thread thread = (Thread)i.next();
516                 if (name.equals(thread.getName()))
517                 {
518                     thread.stop();
519                     return true;
520                 }
521             }
522         }
523         return false;
524     }
525     
526     /* ------------------------------------------------------------ */
527     /**
528      * @param name The thread name to interrupt.
529      * @return true if the thread was found and interrupted.
530      */
531     public boolean interruptThread(String name)
532     {
533         synchronized (_threadsLock)
534         {
535             for (Iterator i=_threads.iterator();i.hasNext();)
536             {
537                 Thread thread = (Thread)i.next();
538                 if (name.equals(thread.getName()))
539                 {
540                     thread.interrupt();
541                     return true;
542                 }
543             }
544         }
545         return false;
546     }
547 
548     /* ------------------------------------------------------------ */
549     /** Pool Thread class.
550      * The PoolThread allows the threads job to be
551      * retrieved and active status to be indicated.
552      */
553     public class PoolThread extends Thread 
554     {
555         Runnable _job=null;
556 
557         /* ------------------------------------------------------------ */
558         PoolThread()
559         {
560             setDaemon(_daemon);
561             setPriority(_priority);
562         }
563         
564         /* ------------------------------------------------------------ */
565         /** BoundedThreadPool run.
566          * Loop getting jobs and handling them until idle or stopped.
567          */
568         public void run()
569         {
570             boolean idle=false;
571             Runnable job=null;
572             try
573             {
574                 while (isRunning())
575                 {   
576                     // Run any job that we have.
577                     if (job!=null)
578                     {
579                         final Runnable todo=job;
580                         job=null;
581                         idle=false;
582                         todo.run();
583                     }
584                     
585                     synchronized(_lock)
586                     {
587                         // is there a queued job?
588                         if (_queued>0)
589                         {
590                             _queued--;
591                             job=_jobs[_nextJob];
592                             _jobs[_nextJob++]=null;
593                             if (_nextJob==_jobs.length)
594                                 _nextJob=0;
595                             continue;
596                         }
597 
598                         // Should we shrink?
599                         final int threads=_threads.size();
600                         if (threads>_minThreads && 
601                             (threads>_maxThreads || 
602                              _idle.size()>_spawnOrShrinkAt))   
603                         {
604                             long now = System.currentTimeMillis();
605                             if ((now-_lastShrink)>getMaxIdleTimeMs())
606                             {
607                                 _lastShrink=now;
608                                 _idle.remove(this);
609                                 return;
610                             }
611                         }
612 
613                         if (!idle)
614                         {   
615                             // Add ourselves to the idle set.
616                             _idle.add(this);
617                             idle=true;
618                         }
619                     }
620 
621                     // We are idle
622                     // wait for a dispatched job
623                     synchronized (this)
624                     {
625                         if (_job==null)
626                             this.wait(getMaxIdleTimeMs());
627                         job=_job;
628                         _job=null;
629                     }
630                 }
631             }
632             catch (InterruptedException e)
633             {
634                 Log.ignore(e);
635             }
636             finally
637             {
638                 synchronized (_lock)
639                 {
640                     _idle.remove(this);
641                 }
642                 synchronized (_threadsLock)
643                 {
644                     _threads.remove(this);
645                 }
646                 synchronized (this)
647                 {
648                     job=_job;
649                 }
650                 
651                 // we died with a job! reschedule it
652                 if (job!=null)
653                 {
654                     QueuedThreadPool.this.dispatch(job);
655                 }
656             }
657         }
658         
659         /* ------------------------------------------------------------ */
660         void dispatch(Runnable job)
661         {
662             synchronized (this)
663             {
664                 _job=job;
665                 this.notify();
666             }
667         }
668     }
669 
670     private class Lock{}
671 }