哈哈,身为菜鸡的我的又来分析一下Java的源码了,上一篇是ThreadLocal的源码分析,这篇也是一篇关于线程的文章--ThreadPoolExecutor。还是那样,如果有错误之处,希望各位指正!
本文参考资料:
1. Java并发编程:线程池的使用
2.【JUC】JDK1.8源码分析之ThreadPoolExecutor(一)
3.深入理解java线程池—ThreadPoolExecutor
4.Java并发编程:Lock
5.Cay S.Horstmann的《Java 核心技术卷I》
由于ThreadPoolExecutor的源代码中设计到了Java中的重入锁,大家可以参考:Java并发编程:Lock和Cay S.Horstmann的《Java 核心技术卷I》第10版的646页。
1.什么是ThreadPoolExecutor?
ThreadPoolExecutor,线程池。我是这么来理解的,线程池相当于是一个池子,里面有多的线程,当用户需要一个线程来执行他的代码块时,直接将他自己的代码块提交到线程池里面去,让线程池来负责执行他的任务。记住,这里是直接将任务提交到线程池,直接让线程池来执行,而不是从线程池取一个获取一个线程来执行,在这之前,我一直都是这么认为的。。。
那为什么需要线程池呢?那楼主之前的经历来说吧,在楼主了解线程池之前,如果想要使用线程的话,直接new一个线程来使用。这个操作有什么缺点呢?
1.消耗系统资源,如果频繁的创建线程和销毁线程的话,会占用大量的系统资源。
2.结构混乱,如果想要使用线程就直接创建的话,这样导致程序结构比较混乱。
3.容易导致内容泄露,这一点楼主从Android里面得到,由于线程常常伴随着耗时操作,非常容易导致内存泄露。
而使用线程池的话,首先系统会给我们维护一个池子,用来存储一定数量的线程,用来供我们使用,这样就避免了频繁的创建线程和销毁线程的情况了;其次,使用线程池的话,我们就不用手动的来创建线程,由池子来给我们创建和维护;最后,由于线程池用来shutdown的机制,在一定的程度上可以避免内存泄露的情况出现。
2.认识ThreadPoolExecutor
(1).ThreadPoolExecutor的基本使用
可能有些老哥对线程池的使用还不是很会,这里在给大家介绍一下基本使用:
public class Demo { public static void main(String[] args) { //创建了核心线程数为2、最大线程数也为的2的线程池 ExecutorService service = Executors.newFixedThreadPool(2); //往线程池里面提交任务 service.submit(new MyRunnable("pby1", 2)); service.submit(new MyRunnable("pby2", 4)); //关闭线程池 service.shutdown(); } static class MyRunnable implements Runnable { private String name = null; private int count = 0; public MyRunnable(String name, int count) { this.name = name; this.count = count; } @Override public void run() { for(int i = 0; i < count; i++){ System.out.println(name + " count = " + i); } } }}
从Demo代码中,我们可以看到,首先我们创建一个线程池的对象,然后往线程池里面提交任务,最后在关闭线程池。这就是线程基本使用的步骤,其实创建线程池的对象有很多种方法,这里使用了其中的一种方式,待会我们在看ThreadPoolExecutor类构造方法的时候,就知道怎么通过构造方法来创建一个线程池的对象,这里先不进行分析。
线程池的使用时非常的简单,现在我们从源代码的方向来分析一下ThreadPoolExecutor的实现方法。
(2).ThreaPoolExecutor的变量
我们要想理解ThreadPoolExecutor,先从它的成员变量,看看他的成员变量有哪些,分别表示什么意思。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
上面的都是ThreadLocal的一些常量,接下来我们一一分析每个常量的意思。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这个变量是非常重要,在整个ThreadLocal的过程中占着十分重要的位置。这里我们可以将这个变量理解成为当前线程池的控制状态,这个控制状态有两个部分组成:一是当前的线程运行状态,运行状态是什么,接下来会详细的解释;二是当前线程里面有效的线程数目,我们先不管这里‘有效’表示的是什么意思,先暂且将它理解为整个线程池里面可以使用的线程数目。
private static final int COUNT_BITS = Integer.SIZE - 3;
这个我们知道COUNT_BITS为29,这个表示意思的是,线程数目表示的范围是0~2 ^ 29。至于这个有什么用,待会讲几种运行的状态就在解释。这里先这么认为,这个线程池中含有的线程数目最多为2 ^ 29个。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
CAPACITY常量,我们从意思上看出来,将1左移了29,相当于是2 ^ 29,具体的值为1后面29个0(二进制),然后减1变成了29个1。这个相当于一个mask,跟任何一个数字进行&运算,取得的值都是低29位。
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
上面的5个常量就表示就是线程池5种状态,这里先将他们的值解释一下,然后解释每种状态的意思。
1.RUNNING:111(int的高3位);表示意思是运行状态,即当前的线程池正在运行,能够正常接收runable对象来执行。
2.SHUTDOWN:000;表示意思是当前线程处于关闭状态,处于当前状态下,线程池不能再接收任务,但是会继续执行任务队列里面的任务。
3.STOP:001;表示意思是当前的线程处于停止状态,此时,线程池不再接收任务,也不会执行任务队列里面的任务,同时还会中断正在执行的任务。
4.TIDYING:010; 表示意思当前的线程处于清理阶段,此时可用的线程数目为0。
5.TERMINATED:011; 表示意思当前的线程已经清理完毕了。
现在我们对运行状态有了一个基本了解,就可以解释一下ctl所指代的两部分两部分表示的意思。之前我们知道,当前有效的线程数占据着一个int值的低29位,而线程池的运行状态占据着一个int值的高3位,这样就打包成为了ctl,也就是线程池的控制状态。
接下来,我们再来理解其他的变量和常量,因为只有理解了这部分变量和常量,后面才很好的理解。
//任务队列 private final BlockingQueue<Runnable> workQueue; //重入锁,用来控制操作原子性 private final ReentrantLock mainLock = new ReentrantLock(); //存放工作者的容器 private final HashSet<Worker> workers = new HashSet<Worker>(); //重入锁的条件对象 private final Condition termination = mainLock.newCondition(); //线程池中拥有过最大的线程数目<=maximumPoolSize private int largestPoolSize; //完成的任务数目 private long completedTaskCount; //产生线程的工厂 private volatile ThreadFactory threadFactory; //拒绝任务的处理器 private volatile RejectedExecutionHandler handler; //空闲线程保活的时间 private volatile long keepAliveTime; //是否允许空闲线程超时等待(等待keepAliveTime这么久) private volatile boolean allowCoreThreadTimeOut; //核心线程的数目 private volatile int corePoolSize; //允许线程最大的数目 private volatile int maximumPoolSize;
虽然我在上面都写了注释的,可能有些老哥还是不是很懂。不急,接下来,接下来详细的解释每个变量或者常量的意思,其中mainLock和termination我不解释,他们的作用是用来控制线程同步的,具体用法可以参考:Java并发编程:Lock。
//任务队列 private final BlockingQueue<Runnable> workQueue;
根据注释,我们知道这个常量是用来存储任务的队列。这里不做多的解释。
//拒绝任务的处理器 private volatile RejectedExecutionHandler handler;
上面的变量是一个拒绝任务的处理器。什么意思呢?当客户端提交一个任务到线程池来,线程池可能会拒绝接收他的任务,此时就需要一个策略来告诉客户。通常来说,分为四种策略,以下内容摘抄自Java并发编程:线程池的使用:
1.ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
2.ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
3.ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
4.ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
//空闲线程保活的时间 private volatile long keepAliveTime;
首先,需要说明该字段生效的前提是allowCoreThreadTimeOut字段为true。意思正如注释所说,如果一个线程空闲下来的话,可以让它保活keepAliveTime时间。
//核心线程的数目 private volatile int corePoolSize; //允许线程最大的数目 private volatile int maximumPoolSize;
接下来,我们来分析一下这两个size字段。首先,我们必须将这两个字段的意思分清。以下内容摘抄自Java并发编程:线程池的使用:
1.corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
2.maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
(3). ThreadPoolExecutor的execute方法
我们知道,当我们在Demo里面使用ThreadPoolExecutor的submit来提交一个任务,最终会回到execute方法:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
submit方法实际是在ThreadPoolExecutor的父类AbstractExecutorService中。我们还是看看execute方法,因为真正执行我们的任务在execute方法里面,这里我还是先将所有的代码贴出来,然后逐一分析。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
这里,我故意将官方的注释留下来,大家有兴趣的话可以阅读!不要怕阅读英文文档,连我这种4级从来没上过400分的都读得懂,所以大家都要相信自己!
从官方的解释中,我们知道execute方法分为3步,我们还是分为3步来分析。首先第一步:
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
首先,ctl在之前我们已经解释过了,它里面包含两个信息,一个是当前线程池的运行状态,一个是当前线程池的有效线程数目。这里的workerCountOf方法就是从ctl里面去获取线程的数目,大家有兴趣的话,可以自行的看看,这里就不再贴出源代码。
从这里,我们知道如果当前的线程数目小于核心线程数目,便调用addWorker方法,如果addWorker方法返回true,表示当前的任务执行开始;否则的话进入第三步。但是这里的addWorker方法是什么意思,之前我说过,我们先把worker当成一个线程,这里我们暂且将addWorker方法理解为为当前的任务创建一个线程。
这里,我不对Worker类进行展开,害怕将你们弄晕了。所以你们记得,将一个Worker对象理解成为一个线程。
这里我们需要注意一个小细节,那就是为什么需要反复的调用ctl.get()方法呢?不要将它想象的很复杂,我们就单纯的认为获取当前最新的控制状态。
然后再来看看第二步。
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
首先,我们先来分析分析进行第二步的条件:1.当前的线程数目大于核心线程数目;2.在第一步中,进行新建一个线程失败时,也会进行第二步。
我们发现,当前的任务直接进入了任务队列,当入队成功之后,如果此时线程池的控制状态不为RUNNNG时,就将这个任务从队列移除,并且调用reject方法来告诉客户,线程池我们拒绝你的任务;另一种情况下,当当前的线程数目为0时,调用addWorker方法,并且传入null,这里传入null的意思,这个意思在runWoker方法里面去讲解,这里说一下,null表示当前这个Woker的结束。大家可以去查询一下资料,实在抱歉!
最后,我们看看第三步。
else if (!addWorker(command, false)) reject(command);
如果addWorker(理解为新建一个线程)操作失败,就拒绝。但是这里我们发现了一个小细节,上面的addWorker方法第二参数传入的true,这里的是false,这是有什么区别吗?这个问题,我们需要在addWorker方法里面找答案。这里先不急,先对execute方法做一个总结。
1.当当前的线程数目小于核心线程数目时,会进行第一步,也就是为新来的任务新建一个线程。
2.当当前的线程数目大于等于核心线程数目时,或者在第一步中新建线程失败,会进行第二步,也就是将任务添加到任务队列中去。
3.当第一步和第二步都失败了的话,也就是说,为新任务新建线程和将新任务添加到任务队列都失败了的话,就进行第三步。从调用的方法中来看的话,应该也是新建一个线程,但是应该与第一步中不同的。
(3). ThreadPoolExecutor的addWorker方法
在execute方法里面,我们有两个疑问:1.addWorker到底是干嘛的?2.传入false和传入true有什么区别?
现在,让我们带着这两个疑问来看看addWorker方法。
我们先来看看第一段代码。
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
我们来的逐一分析,首先是:
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
可能有的老哥一开始就懵逼了,这个特么的判断语句是什么意思。根据我们在离散数学里面学习到的知识,我们来转换一下:
if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())) return false;
由于上面的转换是数学的基础,这里不展开讲解。我们一一分析每个条件是什么意思。
1.rs >= SHUTDOWN && rs != SHUTDOWN,这个不就是rs为STOP、TIDYING或者TERMINATED嘛?这里表示的意思是,当线程池处于这种状态,不能新建新建一个线程了。
2.rs >= SHUTDOWN && rs == SHUTDOWN && firstTask != null,这里需要注意的是,要想比较firstTask是否为null,必须保证rs 为SHUTDOWN,这个是||运算的规则。这种情况下,也就是说,当线程池处于SHUTDOWN状态时,不能为一个不为null的任务新建线程。
2.rs >= SHUTDOWN && rs == SHUTDOWN && firstTask == null && workQueue.isEmpty()。当线程池处于SHUTDOWN状态下,并且传过来任务为null,这个null表示的是什么意思,我也不太懂,抱歉!
这里简单的做一个小总结:
1.当线程池处于STOP、TIDYING或者TERMINATED状态时,不再为提交的任何任务创建线程。
2.当线程池处于SHUTDOWN状态,不能为一个不为null的任务创建线程。
3.当线程池处于SHUTDOWN状态时,并且传入的任务为null,当当前的任务队列为空时,不能创建为新的任务创建任务。从这里,我们可以得出一个结论,如果当前的队列不为空时,可以传入一个为null的任务,这个有什么作用呢?我也不太清楚!实在抱歉!
分析完了什么情况下不能创建线程,我们再来看看,官方是怎么为我们创建的。
for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop }
此时,我们就可以将上面的一个问题解决了--传入false和传入true有什么区别?下面请看这段代码:
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
从这段代码中,我们可以看出来。当为true,如果当前线程数目大于核心数目,那么就创建失败;否则将当前的线程数目提升到最大线程数目去。
其实我们可以这样来理解,当任务来到时,如果当前的线程池里面核心线程数目还没有达到限制,就创建一个核心线程,并且让这个核心线程来执行这个任务,true就表示创建一个核心线程;如果当前的总线程数目(包括核心线程和非核心线程)达到限制的话,就不再为任务创建新的线程,false表示的就是非核心线程。
在这个代码中,还有一个retry可能比较难以理解,break后面加一个retry是什么意思。详细的大家可以看看《Java 核心技术卷I》74页。这种写法类似于C语言里面的goto写法,这里不进行解释。
上面的代码是判断当前线程池是否需要创建一个新的线程,如果需要的话,首先会是当前线程数目加1:
if (compareAndIncrementWorkerCount(c)) break retry;
接下来,就是正式为这个任务创建线程了。
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
以上的代码我觉得都比较简单了,这里就不在详细的讲解了,有可能此时有人对Worker还不是很了解,先不急,我们马上会解释Worker是什么,这里先把理解它成为一个线程。这段代码中需要注意几个小细节:
1.当一个线程添加到容器中时,马上回调用线程的start方法来启动线程。
2.如果线程启动失败的话,会调用addWorkerFailed方法将这个线程从线程容器中remove掉,在addWorkerFailed方法里面同时会将线程数目减1,并且调用tryTerminate方法,来尝试停止当前的线程池。
3.Worker
之前在看ThreadPoolExecutor类的时候,我们在进行一个任务执行时,通常是依靠Worker来执行的。那到底Woker到底是什么呢?
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
这是Worker类的结构,发现它继承了AbstractQueuedSynchronizer类,同时实现了Runnable接口。我对AbstractQueuedSynchronizer类不是很熟悉,所以这里不对其进行分析,如果以后有机会的话,可以试着分析分析。这里我们注意的是Worker类实现了Runnable接口。
我们还是先来看看Woker类的字段吧。
final Thread thread; Runnable firstTask; volatile long completedTasks;
字段也是so easy。这里我们需要注意的是Woker本身为Runnable,但是它里面还有一个Thread,这个才是真正执行任务的类。
然后在再来看看Woker的构造方法。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
从构造方法里面可以看出来,一旦创建一个Woker对象,就会开启一个线程。我们这样来理解,每个Woker都绑定了一个Thread。这里需要注意的是,这个Thread的Runnable就是当前Woker对象,也就是说,如果我们在外面调用这个Thread的start方法,最终启动的这个Woker的run方法。
同时,我们还需要注意的是,setState是什么意思。这个state就涉及到AbstractQueuedSynchronizer里面的东西了,由于我不懂,所以也不好做出相应的解释。这里从官方的注释得到的信息是:这里设置-1的意思为了压制中断,直到Woker启动起来的时候才取消-1的状态,回到正常状态;其中0表示当前的Woker没有被独占,1表示当前的Woker被一个任务独占了。
还记得在addWoker方法里面,我们创建好了一个Woker对象之后,将它添加到容器当中,并且调用了start方法来启动这个这个线程吗?如下:
final Thread t = w.thread;
if (workerAdded) { t.start(); workerStarted = true; }
这里调用start方法就相当于调用了Woker的run方法。我们来看看这个run在执行以下什么。
public void run() { runWorker(this); }
额,好嘛,又调用了runWoker方法。由于这个runWoker属于TheadPoolExecutor类,这里先不进行分析。我们先暂且的认为这个方法就是让一个Woker正式的工作起来。现在,我们先对Woker类做一个小总结:
1.Woker本身实现了Runnable接口,本身不是一个线程。
2.Woker内部持有一个Thread的对象,在创建Woker对象,通过一个工厂来产生一个线程来供Woker使用。
3.外部使用Worker实际上是,通过Woker内部的Thread对象来执行的。
4.每个线程与一个Woker对象绑定,不是单独使用的。
4.又来看ThreadPoolExecutor
上面,我们对Woker类有了一个基本的了解。是不是觉得我把Woker类说的太简单了,我也觉得,感觉自己在浅尝辄止,没有深入学习。没有办法,涉及到深层次的内容我讲不出来,因为我菜啊!实在抱歉!
(1).runWorker方法
上面在分析Woker时,我们发现最终一个线程启动之后,调用到runWoker方法里面去。其实从这个方法的方法名我们就大概的知道这个方法有什么作用了,但是我们还是来看看runWoker方法的代码:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
上面是runWoker代码的主要代码。现在来逐一分析,我们先来看看这段代码:
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();
第一个判断语句非常好理解,当线程池的状态大于等于STOP状态是,中断Woker的线程。
第二个判断语句,我愣是看不懂是什么意思。这里我先解释这个三个条件是什么意思。Thread.interrupted() 方法其实就是判断当前的线程是否被中断,但是我们从从interrupted方法的源代码来看一下:
public static boolean interrupted() { return currentThread().isInterrupted(true); }
这里传入了一个true,需要注意。这段代码的意思就是,先获取当前线程的中断位置的值,然后重置(置为false),其中true表示的意思就是需要重置中断状态位。如果当前线程被中断了,同时此时线程池的状态大于等于STOP,并且当前线程被没有被中断(前面的interrupted方法会重置的),就中断Woker的线程。官方的注释这样说的,这样做主要是为了应付shutdownNow方法竞争的情况,我不太理解!
这里,我个人认为wt线程,也就是Woker的线程就是Thread.currentThread(),也就是当前线程。如果有错误的话,希望各位能够指正。
最后在执行task的run方法来执行这个任务。如果当前的任务执行完毕了,会调用getTask从任务队列去获取。如果getTask从任务队列里面获取的任务为null,表示任务队列已经为空,此时completedAbruptly变量会被置为false,表示线程意外的终止,最后会调用processWorkerExit方法来进行善后工作。
(2).getTask方法
当一个线程执行完毕了任务,不会直接退出run方法而退出线程。而是再次去getTask里面去获取新的任务来执行。让我们来看看getTask到底做了些什么。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
照着老规矩来,我们还是一点一点的分析。首先来看这段代码:
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
这个代码是非常的简单,先调用decrementWorkerCount方法来使线程数目减1,然后再返回null。还记在runWoker里面,当getTask返回一个null时,会怎么样吗?会退出while循环,并且将completedAbruptly标记为false,然后进入processWorkerExit方法进行善后,至于善后做了什么,待会再讲。
返回null的还有其他的情况,如果当前的线程数目大于最大线程数目,也会使得返回null,从而使得一个线程退出,这种操作相当于将线程池的线程数目归约到最大线程数目以内;还有就是当线程池的线程数目大于核心线程数目,在线程池允许超时的条件下,如果当一个空闲线程(任务队列里面没有了任务供线程来执行)超时时间到了,也会退出这个线程,这个相当于是,在超时时间到了,将线程池里面的线程往核心线程数目里面归约。
最后是从workQueue队列里面去任务。如果timed为true的话,首先会从workQueue里面去取任务,如果取到了任务,就立即返回;如果任务队列中没有任务,首先会这个线程等待keepAliveTime那么久的时间,如果还没有的话,就进行下一次循环。
如果timed为false的话,这种情况下allowCoreThreadTimeOut肯定为false,仍然也会去workQueue取任务,如果取到了就立即返回;反之,则无限期阻塞下去,这个跟Android的消息队列很类似,哈哈!
(3). processWorkerExit方法
我们知道,一旦getTask方法返回null,表示在runWoker会退出while循环,先将completedAbruptly置为false,然后进入processWorkerExit方法进行善后。现在让我们来看看这个善后方法在干嘛!嗯,这个方法比较简单,这里直接在代码进行注释
private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果是意外终止,线程数目减1,为什么正常终止不减1呢 //因为在getTask进行了的 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //计算任务完成的总数目 completedTaskCount += w.completedTasks; //将worker从容器里面移除 workers.remove(w); } finally { mainLock.unlock(); } //尝试终止线程池 tryTerminate(); //下面的代码意思不难,但是想要实现的功能,我不太懂 //可以参考官方的注释 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
5.总结
由于这里方法调用层次比较多,这里我先贴一个流程图,再进行总结

楼主不是专业的,可能图绘制的有问题,大家请见谅!
下面对以上的知识做一个总结:
1.客户通过submit方法向线程池提交任务,线程池接收到这个任务,判断当前线程数是否小于核心线程数目,如果则为任务创建线程;如果大于,现尝试将任务添加到任务队列中;如果添加失败的话,则为这个任务添加创建线程,前提是当前线程数目小于最大线程数目。
2.创建线程的过程中在addWoker方法里面进行,如果创建成功,则调用start方法,使这个线程开始动起来;如果创建失败,将这个Woker对象从线程容器中移除。
3.每个线程调用runWoker方法,使得线程不断从任务队列中获取任务来解决。
4.当任务队列没有任务的时候,线程会阻塞在queue.take方法那里。
5.通常来说runWoker方法应该是死循环(不是一直在循环,会阻塞的),但是当在getTask返回null或者抛出了异常,会退出循环,进入processWokerExit方法进行回收操作。
原著是一个有趣的人,若有侵权,请通知删除
还没有人抢沙发呢~