本文带着以下问题1 2 3 4 5 6 7 ❶ execute方法里为啥要用workQueue.offer(command)这个非阻塞方法呢,而不用put等阻塞方法呢? ❷ 线程池收到Runnable任务紧就start执行了,为什么还要将任务放入集合(workers.add(w))呢? workers集合存在的意义是什么呢? ❸ workers 与 workerQueue 与 ctl的关系? ❹ getTask方法中为啥使用workQueue.poll(num, timeUnit) 和 take()阻塞方法呢,为啥不使用非阻塞方法呢? ❺ 线程池里存放的是线程吗? ❻ 线程池使用了两个锁lock,worker使用一个,reentrantLock使用一个,作用分别是什么? ❼ 线程池里怎么区分空闲线程和执行中线程?
理解ThreadPoolExecutor,甚至任何其他的类或组件,我觉得从两个点出发会更顺滑:他的数据结构和结构中的属性变化
所以,现在我们看下ThreadPoolExecutor的数据结构。如下代码1 2 3 4 5 6 7 private final BlockingQueue<Runnable> workQueue; private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private final HashSet<Worker> workers; class Worker final Thread thread; Runnable firstTask; volatile long completedTasks;
做个比喻,在一个森林中,有个超高级的城堡,城堡太大了,为了防止迷路,客人来的时候,自进门那一刻起,就自动给客人配备一辆车和一个保姆。
现在,线程池就是城堡,worker 就是那辆车,thread 就是那个保姆,task 就是那个客人。
他们的关系是,线程池会有0或多个 worker,每个worker配备一个 thread 线程和一个 task 任务。workers 是城堡里的客房。城堡太火爆了,装不下了,城堡为这些人提供了临时露营帐篷,workQueue就是那个露营帐篷。露营帐篷不是一般的帐篷,自带阻塞
ctl是一个巧妙的设计,既表示 workerCount 又表示 runState
本文将ThreadPoolExecutor高深的位运算转换为二进制,以便更直观的理解方法和属性的使用。对加入线程池,执行worker的线程,释放worker的线程,终止线程池等进行细致的理解,以求每个判断,每行代码都能理解。
状态 NOTE: ThreadPoolExecutor代码中的采用了高效的位运算,但阅读源码时不好直观理解,所以我将他们转成十进制,直观大家便于理解。
转换时借助以下方法 1、二进制 -> 十进制 Integer.parseInt(“00111100”, 2) 2、十进制 -> 二进制 Integer.toBinaryString(1)
ctl即是又是 既是workerCount:表示有效的线程数 又是runState: 表示线程池的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 /** * * <pre> * ctl即是又是 * workerCount:表示有效的线程数 * runState: 表示线程池的状态 * </pre> * <pre> * Run state is stored in the high-order bits; worker count is stored in the low-order bits. 解释如下: * | --- 高位 --- | --- 低位 --- | * | -536870912 --> 0 --> 536870912 | * | --- 线程状态 --- | --- 线程数量 --- | * 所以线程状态的变化轨迹是从 -536870912 开始递增,一直到 0;线程数量的变化轨迹是从0 开始递增,一直到 536870912 * 2. ctl直接等于 536870910,再定义两个线程,debug看效果 do it by practise * </pre> * * <pre> * ctl的初始值:-536870912 * ctl = ctlOf(RUNNING, 0): -536870912 -> 11100000000000000000000000000000 -> size: 32 * </pre> */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; /** * <pre> * (1 << COUNT_BITS) = (1 << 29) -> 536870912 -> 100000000000000000000000000000 -> size: 30 * CAPACITY = ((1 << 29) - 1) -> 536870911 -> 11111111111111111111111111111 -> size: 29 * ~CAPACITY = ~((1 << 29) - 1) -> -536870912 -> 11100000000000000000000000000000 -> size: 32 * </pre> */ private static final int CAPACITY = (1 << COUNT_BITS) - 1; /** * NOTE: 这几个状态值是有数值顺序的,所以这几个状态值才可以进行大于、小于等操作 * * runState is stored in the high-order bits * <pre> * COUNT_BITS = 29 * RUNNING (-1 << 29): -536870912 -> 11100000000000000000000000000000 -> size: 32 * SHUTDOWN (0 << 29): 0 -> 0 -> -> size: 1 * STOP (1 << 29): 536870912 -> 100000000000000000000000000000 -> size: 30 * TIDYING (2 << 29): 1073741824 -> 1000000000000000000000000000000 -> size: 31 * TERMINATED (3 << 29): 1610612736 -> 1100000000000000000000000000000 -> size: 31 * </pre> * * 》》》重要知识点出现了:这几个状态值是有数值顺序的,所以这几个状态值才可以进行大于、小于等操作 》》》 * * 通过实践得出:这里的常量只是状态的边界值。换句话说,每个状态其实是一个范围,具体如下 * runState: ------- RUNNING -------- )[ ---------- SHUTDOWN --------- )[ ------------ STOP ---------- )[ ------------- TIDYING -------- )[ TERMINATED * 11100000000000000000000000000000 ~ 0 ~ 100000000000000000000000000000 ~ 1000000000000000000000000000000 ~ 1100000000000000000000000000000 ~ 无穷 */ private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
》》再次强调:runState的几个常量仅是状态的边界值。换句话说,每个状态其实是一个范围,
正是由于每个状态其实是一个范围,状态常量仅是状态范围的边界。介于次,状态方法的使用就简单明了了
状态方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 /** * 这个方法:返回负数说明线程状态是RUNNING;返回0说明线程状态是SHUTDOWN;理论上不返回正数 * * <pre> * 由于 * ~CAPACITY = ~((1 << 29) - 1) -> -536870912 -> 11100000000000000000000000000000 -> size: 32 * ctl = ctlOf(RUNNING,0) -> -536870912 -> 11100000000000000000000000000000 -> size: 32 * 所以 * c=ctl时,ctl & ~CAPACITY -> -536870912 & -536870912 -> 11100000000000000000000000000000 = -536870912, * 所以 * 随着ctl ++,runStateOf方法结果也是负数,并从-536870912开始递 +1,一直到 0,所以也可以是说负数表示线程状态是RUNNING(运行状态)时 * * 举例: * 当第一次ctl ++后,ctl -> -536870911 -> 11100000000000000000000000000001 * 此时,ctl & ~CAPACITY -> -536870911 & -536870912 -> 11100000000000000000000000000001 & 11100000000000000000000000000000 * NOTE: 当runStateOf等于0时,线程状态就变成了SHUTDOWN * </pre> */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * <pre> * 由于 * CAPACITY = (1 << 29) - 1 -> 536870911 -> 11111111111111111111111111111 -> size: 29 * ctl = ctlOf(RUNNING, 0) -> -536870912 -> 11100000000000000000000000000000 -> size: 32 * 所以 * c=ctl时,ctl & CAPACITY -> -536870912 & 536870911 -> 00000000000000000000000000000000 = 0 * 所以 * 随着ctl ++,所以workerCountOf方法结果从0开始递 +1,一直到 536870911 * </pre> */ private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的执行过程,这个网上说的很明白了
Tips => 正式开始前,再次强调 ctl其值为正数时表示线程数,其值为负数时表示线程状态
添加任务方法 - execute 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void execute(Runnable command) { /* * Proceed in 3 steps: * 线程加入线程池的执行过程,见上图 */ int c = ctl.get(); // 当前线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 在addWorker里执行时,如果其他线程对线程池调用shutdown or shutdownNow or terminate and so on, // 那么addWorker返回false,从而走到这行代码 c = ctl.get(); } // =》isRunning(c) = c < SHUTDOWN =》记住一点:ctl小于0即是Running状态 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果其他线程对线程池调用shutdown or terminate相关方法,对于刚才加入队列的任务要删除调 if (! isRunning(recheck) && remove(command)) reject(command); // 假设线程池中只有一个运行着的线程:T1,当main线程走到这行代码时,T1运行完了,并对ctl执行了-1操作后就是0了,此时这行判断为true // 但是此时addWorker传入的任务是null,疑惑吗?这是因为代码执行到这时,任务task已经加入到workQueue队列了,而在runWorker方法中,如果worker的firstTask是null,那么会从workQueue队列里取任务task执行,所以此处传null给addWorker得以有机会执行t.start(),从而执行runWorker方法。__从这里看出一点:work count是不包括队列中任务的__ else if (workerCountOf(recheck) == 0) addWorker(null, false); // (1) } // 如果执行到这里,两种情况: // 1. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。使用最大线程数的逻辑 // 2. 线程池已经不是RUNNING状态,即c >= SHUTDOWN。那为什么还要走addWorker方法呢,我的理解是:这是作者代码精简的结果,addWorker方法有c >= SHUTDOWN的判断逻辑 else if (!addWorker(command, false)) reject(command); }
这里抛出一个问题(问题A):这里为啥要用workQueue.offer(command)这个非阻塞方法呢,而不用put等阻塞方法呢?先想想,文末一起说说
添加任务方法 - addWorker 记住一个前提:进入这个方法的条件是当前线程数<=核心线程数,或队列已满&当前线程数<=最大线程数。有了这个前提就好理解多了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 线程池不是RUNNING状态时触发 // 且,(rs不是0 或 firstTask不是null 或 队列是空) // 返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 对c进行CAS操作,直到成功 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // (rs == SHUTDOWN && firstTask == null)这种情况仅仅适用于execute方法的(1)处情况 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //(1) workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker方法一共做了两件事:1.ctl递增;2.Worker对象加入workers集合并start Worker.thread线程(即线程池中的线程)。再具体点说,这个方法主要是 start Worker.thread线程。既然是这样,那么有两个疑问:
再问个问题(问题E):线程池(ThreadPoolExecutor)中存放的是线程吗?不是,是一堆的Worker对象,Worker既不是thread线程也不是要执行的任务。那么它是做啥的呢
我们来看下Worker的构造方法1 2 3 4 5 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker (1) inhibit interrupts:抑制中断 this.firstTask = firstTask; // (2) this.thread = getThreadFactory().newThread(this); // (3) }
setState(-1):包含了一个很重要的点:worker中的thread线程可以被中断的条件:aqs.state=0 从构造方法知道,我们要执行的任务成为Worker的一个字段,同时Worker还有一个thread字段,看Worker的(3)处代码,我觉得这行很关键,我改下它的同义写法:this.thread.target = this,即worker作为他自身的thread字段的值,从Worker的定义知道,Worker本身也是Runnable的。所以,当执行addWorker方法的(1)处t.start()时,我们的任务也跟着执行了,这个流程如图
明白了这里,Worker.run()和runWorker(Worker)怎么触发的就很容易理解了
执行任务方法 - runWorker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 Worker的thread字段值,执行thread.start()方法,触发了此方法的执行。 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // worker的lock用于区分线程是否空闲的 w.unlock(); // allow interrupts (b) boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // 这里为啥要加锁,结合 interruptIdleWorkers 方法一起思考?先透漏一点:w.lock() 将 w.state 设置为1,从而保证 w.thread 线程不能被中断 // 如果线程池正在停止,那么要保证当前线程是中断状态; // 如果不是的话,则要保证当前线程不是中断状态; // ctl > STOP if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 业务人员自己实现 beforeExecute(wt, task); try { task.run(); } catch (Throwable x) { throw new Error(x); } finally { // 业务人员自己实现 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } // completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。 completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
请注意一个细节: runWorker方法是在什么线程里执行?这有助于问题D的理解,线程池的一个线程中执行
runWorker方法的目的就是执行任务(即task.run())。它首先执行Worker的firstTask,然后再从workQueue队列里取task继续执行。简单来说,就是取任务 执行,取任务 执行,取任务 执行 。在这其中,怎么取,执行前中后会做什么事情,如ctl判断,线程中断检测,w.unlock()和w.lock()等都很重要
怎么取:这是getTask()方法的事情,稍后说
ctl判断:runStateAtLeast(ctl.get(), STOP) => 是否 ctl.get() > STOP,当线程被中断了,这个方法才返回true
w.unlock()和w.lock(),这两个方法操作是用于区分线程是否忙碌/空闲的?为什么呢
因为:runWorker中线程在执行任务时,线程是时刻被线程池管理和监控着呢。线程池可以视情况随时进行关闭(调用shutdown)、立刻关闭(调用shutdownNow)、终止(调用tryTerminate)。我们回头看线程池的状态,其中 SHUTDOWN:不接收新任务,继续处理队列的任务;STOP:不接收新任务,不处理队列的任务,中断正在进行的任务。SHUTDOWN状态下要实现的逻辑就是通过 worker 的 lock 实现的。
假设线程正在执行任务,此时线程池进行关闭,即调用 shutdown 方法。如果没有 worker.lock(),那么正执行任务的线程就会被打断,任务无法继续执行。这样SHUTDOWN状态的逻辑就无法完成了。
同时,w.unlock()和w.lock(),为什么说他们可区分线程是否忙碌/空闲的呢。试想,worker 关联的线程被 lock 了,说明正在执行任务呢。所以说此线程是忙碌的线程
注意一点:getTask 方法是在 lock 之外的。所以,阻塞着的线程就是空闲线程了
worker的lock用于区分线程是否空闲的。 结合shutdown()方法一起理解,见下文shutdown的部分
从while条件可知:null值对于runWorker()来说有特殊用途:通知获取任务的工作线程结束并退出,所以getTask方法返回null时是很特殊的
执行任务方法 - getTask 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 private Runnable getTask() { boolean timedOut = false; // 上一次的poll()的调用是否超时? for (;;) { int c = ctl.get(); // 这里需要记住文章开头的那些状态字段值,才反应的快些 // => c & ~CAPACITY int rs = runStateOf(c); /* * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断: * 1. rs >= STOP,线程池是否正在stop; * 2. 阻塞队列是否为空。 * 如果以上条件满足,则整个判断条件为true。说明线程池突然终止, * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务 * * rs >= SHUTDOWN 说明当前线程池至少处于待关闭状态,不再接受新的任务 * rs >= STOP: 说明不需要在再处理任务了(即便任务在执行) * 所以,说明线程池在关闭,那就不执行任务task了 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 线程池要/正在关闭了,所以返回null,这样外层的runWorker方法就可以退出了。 // 所以代码走到这里含义是此任务的工作线程就要退出了。 // 相应的,ctl当然随之要减一 decrementWorkerCount(); return null; } // => c & CAPACITY int wc = workerCountOf(c); // timed:当前线程池中的线程数量是否超过了最大数量 或 核心线程允许超时 时为 true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // A:wc > maximumPoolSize什么场景下是true呢?答案是有人动态调整了最大线程数 // B:timeOut:poll超时 或 take被打断,再回到for循环时为 true;timed:如上 // C:wc > 1:线程数大于1 // D:workQueue.isEmpty():队列为空 /// 线程退出条件 // A C 为 true => 线程池里线程太多了,当前线程要退出了 // A D 为 true => 线程池里线程太多了,且 队列里没有任务了,当前线程要退出了 // B C 为 true => 线程池里线程数量大于核心线程数量,且 核心线程允许超时,当前线程要退出了 // B D 为 true => 线程池里线程数量大于核心线程数量,且 核心线程允许超时,且 队列里没有任务了,当前线程要退出了 // 所以线程数量减一。同时return null,意味着此工作线程要退出了 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 此线程要退出了,ctl随之减一 if (compareAndDecrementWorkerCount(c)) return null; // 如果减1失败,则for回来重试 continue; } /// 程序第一次执行到这,B:timedOut:一定false // 如果A:false (C D是啥都行),那么当前线程池会阻塞,直到队列有任务或超时,然后再回到“线程退出条件”代码,依当前条件决定线程是否退出 try { // 这行语句表达了一个想法:线程只要是存活着的,他就应该执行任务。没有任务时,就等着有任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // (1) if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
这里有个疑问(问题D):getTask 方法的(1)处代码为什么要用阻塞的方法呢,不阻塞的方法不行吗,答案文末给出
addWorker 方法与 runWorker 方法就像是蓄水池中进水管和出水管,分析的时候需要两个结合的捋逻辑。而 getTask 方法是 runWorker方法中的核心逻辑。而 getTask 方法中的条件判断逻辑更核心。同时,分析的时候,与shutdown,shutdownNow,tryTerminate,awaitTermination等方法的逻辑相互结合在一起会更清晰
以下是关闭相关的方法 shutdown和shutdownNow方法比较相似,类比图如下
shutdown 调用shutdown()方法会进入 SHUTDOWN 状态。在 SHUTDOWN 状态下,线程池不接受新的任务,但是会继续执行任务队列中已有的任务。 怎么证明它此时不接收新的任务了呢,you need to do it by practise
通过调用shutdown()关闭的线程池,关闭以后表现的行为就是不能再提交任务给线程池,但是在关闭前已经提交的任务仍旧会被执行。等到任务队列空了以后线程池才会进入关闭流程1 2 3 4 5 6 7 8 9 10 11 12 13 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdown方法核心由三部分组成:
advanceRunState(SHUTDOWN):改线程池状态为SHUTDOWN
interruptIdleWorkers():中断线程池中空闲线程
tryTerminate():Transitions to TERMINATED state if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)
advanceRunState 转换线程池状态为入参值,入参值只能是SHUTDOWN or STOP 如果是SHUTDOWN,那么执行完advanceRunState方法后ctl的值>=0,即>=SHUTDOWN。假如当前线程数是3,那么ctl就是3 如果是STOP,那么执行完advanceRunState方法后ctl的值>=536870912,即>=STOP。假如当前线程数是3,那么ctl就是536870912+3
所以,这也证实了线程池的状态是一个范围,而不是一个值,这个范围正如文档开头处所述1 2 3 4 5 6 7 8 private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
interruptIdleWorkers 方法很明确,就是将workers对应的线程中断。从方法的名称就可以知道功能是对空闲的线程中断。那怎么知道哪些work的线程是空闲的呢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
注意这点:w.tryLock(),为啥要试图加锁呢。这时候就要看看runWorker方法了,runWorker执行时是要对worker加锁的(即调用lock)。 所有工作中的线程都需要 lock 加锁(state从0变为1),所以在这里通过Worker.tryLock()来判断被检查的工作线程是否是空闲状态(试图将state从0变为1),空闲就可以发送interrupt()命令。所以,逻辑上看,w.lock => state从0变为1 => Worker.tryLock() 试图将state从0变为1,结果必然是false;结果上看,worker加锁了,就是工作状态,即不是空闲状态,就不能中断了
tryTerminate 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 final void tryTerminate() { for (;;) { int c = ctl.get(); // 线程池为下面三种情况,直接return // 1.线程池为RUNNING状态,线程池还在运行中,不能终止 // 2.线程池为TIDYING或TERMINATED,因为线程池已经终止了,不用再终止了 // 3.线程池为SHUTDOWN状态 & 线程池队列不为空,队列里有任务,不能终止 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 代码走到这,说明此时线程池是STOP状态或(SHUTDOWN状态且线程池队列是空), 线程池里还有线程 if (workerCountOf(c) != 0) { // Eligible to terminate //这时候可能只有一个空闲线程了,它是在getTask方法中执行workQueue.take()了的线程,此线程属于空闲线程(在w.lock()外),它正在阻塞着等待着线程来呢。如果不执行中断会一直阻塞。你可能会说,在前面执行interruptIdleWorkers(false)方法时,会中断所有的空闲线程,这里重复执行了吧?试想下如果在执行interruptIdleWorkers(false)时恰好有个工作线程没有空闲,你刚执行完interruptIdleWorkers(false),那个线程就回到while里去调用了getTask方法,这时workQueue中没有任务了,就会调用workQueue.take()一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断那个空闲工作线程,避免在队列为空时取任务一直阻塞的情况 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将线程池状态设置为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 执行terminated后,将线程池状态设置为TERMINATED,线程池结束 ctl.set(ctlOf(TERMINATED, 0)); // 通知awaitTermination方法,线程池结束 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
shutdownNow 调用shutdownNow()会进入 STOP 状态。在 STOP 状态下线程池既不接受新的任务,也不处理已经在队列中的任务。对于还在执行任务的工作线程,线程池会发起中断请求来中断正在执行的任务,同时会清空任务队列中还未被执行的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow方法核心由四部分组成:
advanceRunState(STOP):改线程池状态为STOP,与advanceRunState(SHUTDOWN)逻辑相同
interruptWorkers():中断线程池中所有线程,这个与interruptIdleWorkers的区别细细体会,这个方法中断所有已经启动的工作线程,即进行中的任务(执行了w.lock,但还没执行w.unlock),这些线程中断可能成功也可能不成功
tryTerminate():前面已说完
drainQueue():从任务队列中取出所有未被执行的任务,未被执行的任务列表会被作为返回值返回给应用程序
interruptWorkers 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 // Interrupts all threads, even if active private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
有个疑问:interruptWorkers是中断线程池中所有的线程(空闲的和执行中的总和),但interruptIfStarted()方法只是中断执行中的线程。如果你有这个疑惑的话,咱们一起看下getState() >= 0这个判断,我们看下runWorker方法,先执行了w.unlock(),再执行w.lock(),在执行w.unlock(),unlock是把state设置为0,lock把state设置为1,又只要执行了runWorker,那么state的值就是>=等于0的了,所以不管空闲与否,state总是>=0,所以interruptWorkers方法这时候执行interruptIfStarted方法,中断的就是所有的线程
drainQueue 将workerQueue队列里的worker返回1 2 3 4 5 6 7 8 9 10 11 12 private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
问答 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 ❶ execute方法里为啥要用workQueue.offer(command)这个非阻塞方法呢,而不用put等阻塞方法呢? answer: 以为execute方法是运行在main线程里的,如果使用阻塞方法,那么后面的任务就无法添加到线程池了 ❷ 线程池收到Runnable任务紧就start执行了,为什么还要将任务放入集合(workers.add(w))呢? workers集合存在的意义是什么呢? answer: 放入workers是为了保存当时的thread和worker,不然后面怎么对worker和thread进行加锁和中断啊,addWorker和runWorker本来就是并行的关系,还要时刻监视着shutdown,shutdownNow,terminate之类的动作 ❸ workers 与 workerQueue 与 ctl的关系 workers的size应该=ctl.get;workerQueue与ctl没有关系 ❹ getTask方法中为啥使用workQueue.poll(num, timeUnit) 和 take()方法,为啥不使用非阻塞方法呢 answer: getTask里有一个for自旋,一直找任务执行,如果不使用阻塞方法,那么for自旋将一直占着cpu,这个原理和synchronized的升级原理是一样的 ❺ 线程池里存放的是线程吗 answer: 不是,是Worker,Worker是线程池中的线程和任务task之间的纽带 ❻ 线程池使用了两个锁lock,worker使用一个,reentrantLock使用一个,作用分别是什么? answer: worker的lock是属于worker的,是为了区分线程是否为空闲还是运行中 reentrantLock mainLock 是属于线程池的,是为了实现线程池中各方法之间保持不竞争的 ❼ 线程池里怎么区分空闲线程和执行中线程 answer: worker的thread是否被lock 两个发散性的问题: ❽ 线程池中如何使用ThreadLocal ❾ ThreadPoolExecutor运行在多线程环境中会怎样的
小结 对ThreadPoolExecutor的理解每次都会有新的收获,看似不经意的一行代码,一个判断,实践懂了之后都感叹和震撼作者的编程能力,每次领悟之后,都感觉作者的实现技巧都开启了我以前没见过的窗。
ThreadPoolExecutor属性源码和自己debug实践的例子:ThreadPoolExecutorTest0
参考 深入理解Java线程池:ThreadPoolExecutor
ThreadPoolExecutor实现原理
java-threadpoolexecutor