理解 Java 线程池 ThreadPoolExecutor

使用 JDK 5 的线程池实现有近 20 年的时间了,快速创建一个线程池经常是调用 Executors 中的工厂方法。但是涉及过更精细的线程池管理控制时不得不用 ThreadPoolExecutor 的构造方法,这也就是为什么有些公司不建议用 Executors 的工厂方法创建线程池,而应该直接创建 ThreadPoolExecutor 或 ForkJoinPool 实例。 

例如代码

ExecutorService threadPool = Executors.newFixedThreadPool(10);

实际上调用的是

new ThreadPoolExecutor(nThreads, nThreads,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>());

前两个参数 corePoolSize 和 maximumPoolSize 是一样的; OL, TimeUnit.MILLISECONDS 表示线程创建后只要线程池还在就是永生的; workQueue 是一个大小为 Integer.MAX_VALUE 的队列, 几乎可以无限提交任务,耗尽内存

不建议用 Executors 的工厂方法的原因大致有二:

  1. 创建的 ExecutorService 类型没有提供访问线程池内部状态的方法,不过愿意转型的话
    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
    这个不是问题
  2. ThreadPoolExecutor 有比 Executors 工厂方法更丰富的构造参数,Executors.newFixedThreadPool(10) 满足不了更复杂的情形。巨大的 workQueue(new LinedBlockingQueue<Runnable>()) 在提交大量任务时会撑爆内存; 自定义线程名称,行为; 无法提交任务时的处理方式等

ThreadPoolExecutor 的完整构建函数是

本文通过调试, 感性的方式来理解创建 ThreadPoolExecutor 实例的各个参数的含义,如果想要理性些就应该阅读它的源代码。

新线程池不会预先创建线程 

默认的 ThreadFactory 是 Executors.defaultThreadFactory(), 默认的 RejectedExecutionHandler 是 new ThreadPoolExecutor.AbortPolicy()

输出为

java.util.concurrent.ThreadPoolExecutor@2cfb4a64[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]

上面代码运行后线程池中无线程,代码能正常退出,无需调用 threadPool.shutdown()

提交第一个任务只创建一个线程

往下测试时添加一个辅助任务方法

添加一个任务

提交的一个任务将会在 10 秒后结束,这时候打印输出的是

[pool-1-thread-1] 00:23:04.344 start task: 1
java.util.concurrent.ThreadPoolExecutor@57829d67[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[pool-1-thread-1] 00:23:14.359 end task: 1

说明线程池中创建了一个线程,并且程序不会正常退出,原因是线程池中创建了一个 daemon 为 false 的线程,它会阻止当前进程(主线程)的退出,只有在调用 threadPool.shutdown() 或 threadPool.shutdownNow() 等其中的任务完成后才会退出进程。

ThreadPoolExecutor 的 shutdown() 和  shutdownNow() 的执行效果不在本文讲述之列。

综合测试观察任务数与线程池, 等待队列的关系

接下来是通过输入参数来控制提供的任务数,从而观察线程的内部状态

每个任务的执行时长为 10 秒,因此所有任务提交进线程池 2 秒后都没有任务完成

如果参数为 2 时输出为

[pool-1-thread-1] 00:23:37.858 start task: 1
[pool-1-thread-2] 00:23:37.858 start task: 2
java.util.concurrent.ThreadPoolExecutor@19dfb72a[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[pool-1-thread-2] 00:23:47.871 end task: 2
[pool-1-thread-1] 00:23:47.871 end task: 1
java.util.concurrent.ThreadPoolExecutor@19dfb72a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]

下面是改变 numberOfTasks 时更多的测试结果(第一个 System.out.println(threadPool) 中的 pool size, active threads, 和 queued tasks)

numberOfTasks pool size active threads
(工作线程数)
queued tasks 开始执行
的任务号
说明, 创建线程池的参数为
(core: 2, max: 5, workQueue size: 10)
0 0 0 0 无任务时不创建线程,不存 daemon 为 false  的线程,所以程序能正常退出
1 1 1 0 1 任务数为 1 时创建相应的一个线程
2 2 2 0 1,2 任务数为 2 时创建两个线程,这时线程数到达一个 core 指定的 临界值
3 2 2 1 1,2 任务数为 3, 介于 core: 2 和 max: 5 之间,只创建 core 指定的 2 个线程, 额外任务放到 workQueue 队列中
4 2 2 2 1,2 与上同,保持 core 数值指定的 2 个线程,额外任务陆续放到 workQueue 队列中
5 2 2 3 1,2
6 2 2 4 1,2
7 2 2 5 1,2
8 2 2 6 1,2
9 2 2 7 1,2
10 2 2 8 1,2
11 2 2 9 1,2
12 2 2 10 1,2 此时达到 workQueue 队列已满的 临界值
13 3 3 10 1,2,13 workQueue 队列已满,增加一个线程直接执行当前新添加的任务
14 4 4 10 1,2,13,14 workQueue 队列已满,继续增加一个线程直接执行当前新添加的任务
15 5 5 10 1,2,13,14,15 workQueue 队列已满,增加增加一个线程直接执行当前新添加的任务,  这时线程数量达到 max 值. workQueue 与线程数都达到 临界值
16 5 5 10 1,2,13,14,15 提交 16 号任务时失败, 但已提交的任务仍能被执行
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d6e8792 rejected from java.util.concurrent.ThreadPoolExecutor@2812cbfa[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]

线程数达到 max 值,并且在 workQueue 队列已满时,新提交的任务触发 RejectedExecutionHandler

从以上的测试结果,我们不难理解构建 ThreadPoolExecutor 时的 corePoolSize, maximumPoolSize, workQueue 大小和提交任务数时的关系

  1. 有新任务提交后才会创建线程
  2. 提交新任务时,如果空闲线程少于 corePoolSize, 则创建新线程执行任务
  3. 提交新任务时,如果线程数等于 corePoolSize, 并且 workQueue 未满时,则新任务放到 workQueue 中
  4. 提交新任务时,如果线程数等于 corePoolSize, 并且 workQueue 已满时,同时 maximumPoolSize 大于 corePoolSize, 则创建新线程,且用新创建的线程执行此时提交的任务
  5. 提交新任务时,如果线程数达到 maximumPoolSize 大小,并且 workQueue 已满时,无法提交任务,触发 RejectedExecutionHandler

如果用一张动图来展示提交任务的过程(workers 即当前线程池中的线程数)

(假设执行中的任务都很忙)

注:以上测试没有揭示出某个任务执行完毕后,有新任务提交的情形。有兴趣的话可以对此进行测试,最后会发现 ThreadPoolExecutor 决定是否创建新线程是基于当前空闲线程的数量,有空闲线程则重用而不倾向于创建新的线程。

由于只有在 workQueue 爆满后线程池大小才会从 corePoolSize 增长至 maximumPoolSize, 因此我们用一个巨大的 workQueue 时,极有可能在内存耗尽时线程池的大小仍然是 corePoolSize 大小。如下机这样的写法

只有在 LinkedBlockingQueue() 满了之后,线程池大小才会大于 2,往 5 方向生长。

关于构造 ThreadPoolExecutor 时的 keepAliveTime/unit 的用途

如是 keepAliveTime 非零时,当线程数大于  corePoolSize, 某个线程在空闲多久时间后结束自己,最后线程数回退到 corePoolSize 大小。以下是测试

保证在执行完最后一个任务时有线程空闲已超过 3 秒,所以输出为

java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]         // 1
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 14]         // 2
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 14]         // 3

corePoolSize 比 maxiumPoolSize 小且 keepAliveTime 不为 0 时,超过 corePoolSize 部分的线程才有机会自我消亡。我们可以考虑不同的 corePoolSize,maximumPool 的设置来处理异常的高并发,并能结束长期不用的线程来释放相应的资源 -- 内存和文件句柄

关于 RejectedExecutionHandler

RejectedExecutionHandler 在 workQueue 已满,线程数达到 maximumPoolSize 并且都很忙的时候触发,默认的 RejectedExecutionHandler 是 AbortPolicy(), 即直接丢弃新提交的任务,这可能不是我们想要的,如果 workQueue 足够大不会是个问题。另外 Java 提供了其他几个 RejectedExecutionHandler 实现

  1. DiscardOldestPolicy: 移除一个最旧的未开始处理的任务并重试提交新任务。会造成连续移除旧的任务
  2. DiscardPolicy: 安静无异常的丢弃无法提交的新任务。会造成连续的丢弃新任务
  3. CallerRunsPolicy: 无法提交任务的话就能提交任务的线程执行,反正闲着也是闲着。在 Caller 执行新任务时也就无法提交后续任务,不会造成连续失败。这是一个配合有限 workQueue 队列的好办法。

ThreadPoolExecutor 从 workQueue(BlockingQueue) 中获取任务是调用它的 take() 方法,无任务则阻塞, 等待。在需要往 workQueue 中添加任务时是调用 offer(Runnable) 方法,而 offer() 是能往 workQueue 中添加任务则返回 true, 否则返回 false 并触发 RejectedExecutionHandler, 无异常,也不等待。但 BlockingQueue 的 put() 方法会等待啊,所以曾经使用了下面的方式来使用有限的 workQueue

覆盖 offer() 方法,转换为无限等待的 put() 方法,或者可以是有等待期限的 offer(runnable, timeout, unit) 方法。

在 workQueue 已满时再提交任务的话会让提交任务的线程(Caller)一味的阻塞,无所作为,与 CallerRunsPolicy() 相比确实是浪费了 Caller 的资源。不过 CallerRunsPolicy 会有个问题,假如它在执行任务(一个重型任务)过程中,线程池里早先的任务已经执行完毕,释放了线程,但 Caller 被占用着无法及时提交更多的新任务,造成更多的线程浪费。

参考:

  1. JUC线程池: ThreadPoolExecutor详解

本文链接 https://yanbin.blog/understand-java-thread-pool-executor/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

1 Comment
Inline Feedbacks
View all comments
trackback

[…] 这就是在之前 理解 Java 线程池 ThreadPoolExecutor 一文中介绍的实现代码,代码是 […]