理解 Java 线程池 ThreadPoolExecutor

使用 JDK 5 的线程池实现有近 20 年的时间了,快速创建一个线程池经常是调用 Executors 中的工厂方法。但是涉及过更精细的线程池管理控制时不得不用 ThreadPoolExecutor 的构造方法,这也就是为什么有些公司不建议用 Executors 的工厂方法创建线程池,而应该直接创建 ThreadPoolExecutor 或 ForkJoinPool 实例。 

例如代码
ExecutorService threadPool = Executors.newFixedThreadPool(10);
实际上调用的是
new ThreadPoolExecutor(nThreads, nThreads,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>());
前两个参数 corePoolSize 和 maximumPoolSize 是一样的; OL, TimeUnit.MILLISECONDS 表示线程创建后只要线程池还在就是永生的; workQueue 是一个大小为 Integer.MAX_VALUE 的队列, 几乎可以无限提交任务,耗尽内存

不建议用 Executors 的工厂方法的原因大致有二:
  1. 创建的 ExecutorService 类型没有提供访问线程池内部状态的方法,不过愿意转型的话
    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
    这个不是问题
  2. ThreadPoolExecutor 有比 Executors 工厂方法更丰富的构造参数,Executors.newFixedThreadPool(10) 满足不了更复杂的情形。巨大的 workQueue(new LinedBlockingQueue<Runnable>()) 在提交大量任务时会撑爆内存; 自定义线程名称,行为; 无法提交任务时的处理方式等

ThreadPoolExecutor 的完整构建函数是
1public ThreadPoolExecutor(int corePoolSize,
2                          int maximumPoolSize,
3                          long keepAliveTime,
4                          TimeUnit unit,
5                          BlockingQueue<Runnable> workQueue,
6                          ThreadFactory threadFactory,
7                          RejectedExecutionHandler handler)

本文通过调试, 感性的方式来理解创建 ThreadPoolExecutor 实例的各个参数的含义,如果想要理性些就应该阅读它的源代码。

新线程池不会预先创建线程 

1public static void main(String[] args) {
2    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
3    System.out.println(threadPool);
4}

默认的 ThreadFactory 是 Executors.defaultThreadFactory(), 默认的 RejectedExecutionHandler 是 new ThreadPoolExecutor.AbortPolicy()

输出为
java.util.concurrent.ThreadPoolExecutor@2cfb4a64[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
上面代码运行后线程池中无线程,代码能正常退出,无需调用 threadPool.shutdown()

提交第一个任务只创建一个线程

往下测试时添加一个辅助任务方法
 1private static void runTask(int sleepInSeconds, int taskNum) {
 2    try {
 3        System.out.printf("[%s] %s start task: %s\n", Thread.currentThread().getName(), LocalTime.now(), taskNum);
 4        Thread.sleep(sleepInSeconds * 1000);
 5        System.out.printf("[%s] %s end task: %s\n", Thread.currentThread().getName(), LocalTime.now(), taskNum);
 6
 7    } catch (InterruptedException e) {
 8        throw new RuntimeException(e);
 9    }
10}

添加一个任务
1public static void main(String[] args) {
2    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
3    threadPool.submit(() -> runTask(10, 1));
4    System.out.println(threadPool);
5}

提交的一个任务将会在 10 秒后结束,这时候打印输出的是
[pool-1-thread-1] 00:23:04.344 start task: 1
java.util.concurrent.ThreadPoolExecutor@57829d67[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[pool-1-thread-1] 00:23:14.359 end task: 1
说明线程池中创建了一个线程,并且程序不会正常退出,原因是线程池中创建了一个 daemon 为 false 的线程,它会阻止当前进程(主线程)的退出,只有在调用 threadPool.shutdown() 或 threadPool.shutdownNow() 等其中的任务完成后才会退出进程。

ThreadPoolExecutor 的 shutdown() 和  shutdownNow() 的执行效果不在本文讲述之列。

综合测试观察任务数与线程池, 等待队列的关系

接下来是通过输入参数来控制提供的任务数,从而观察线程的内部状态
 1public static void main1(String[] args) throws InterruptedException {
 2    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
 3
 4    int numberOfTasks = Integer.parseInt(args[0]);
 5    IntStream.rangeClosed(1, numberOfTasks).forEach(i ->
 6            threadPool.submit(() -> runTask(10, i))
 7    );
 8    TimeUnit.SECONDS.sleep(1);
 9
10    System.out.println(threadPool);
11    threadPool.shutdown();
12    threadPool.awaitTermination(1, TimeUnit.HOURS);
13    System.out.println(threadPool);
14}

每个任务的执行时长为 10 秒,因此所有任务提交进线程池 2 秒后都没有任务完成

如果参数为 2 时输出为
[pool-1-thread-1] 00:23:37.858 start task: 1
[pool-1-thread-2] 00:23:37.858 start task: 2
java.util.concurrent.ThreadPoolExecutor@19dfb72a[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[pool-1-thread-2] 00:23:47.871 end task: 2
[pool-1-thread-1] 00:23:47.871 end task: 1
java.util.concurrent.ThreadPoolExecutor@19dfb72a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
下面是改变 numberOfTasks 时更多的测试结果(第一个 System.out.println(threadPool) 中的 pool size, active threads, 和 queued tasks)

numberOfTaskspool sizeactive threads
(工作线程数)
queued tasks开始执行
的任务号
说明, 创建线程池的参数为
(core: 2, max: 5, workQueue size: 10)
0000无任务时不创建线程,不存 daemon 为 false  的线程,所以程序能正常退出
11101任务数为 1 时创建相应的一个线程
22201,2任务数为 2 时创建两个线程,这时线程数到达一个 core 指定的 临界值
32211,2任务数为 3, 介于 core: 2 和 max: 5 之间,只创建 core 指定的 2 个线程, 额外任务放到 workQueue 队列中
42221,2与上同,保持 core 数值指定的 2 个线程,额外任务陆续放到 workQueue 队列中
52231,2
62241,2
72251,2
82261,2
92271,2
102281,2
112291,2
1222101,2此时达到 workQueue 队列已满的 临界值
1333101,2,13workQueue 队列已满,增加一个线程直接执行当前新添加的任务
1444101,2,13,14workQueue 队列已满,继续增加一个线程直接执行当前新添加的任务
1555101,2,13,14,15workQueue 队列已满,增加增加一个线程直接执行当前新添加的任务,  这时线程数量达到 max 值. workQueue 与线程数都达到 临界值
1655101,2,13,14,15提交 16 号任务时失败, 但已提交的任务仍能被执行
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d6e8792 rejected from java.util.concurrent.ThreadPoolExecutor@2812cbfa[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]



线程数达到 max 值,并且在 workQueue 队列已满时,新提交的任务触发 RejectedExecutionHandler

从以上的测试结果,我们不难理解构建 ThreadPoolExecutor 时的 corePoolSize, maximumPoolSize, workQueue 大小和提交任务数时的关系
  1. 有新任务提交后才会创建线程
  2. 提交新任务时,如果空闲线程少于 corePoolSize, 则创建新线程执行任务
  3. 提交新任务时,如果线程数等于 corePoolSize, 并且 workQueue 未满时,则新任务放到 workQueue 中
  4. 提交新任务时,如果线程数等于 corePoolSize, 并且 workQueue 已满时,同时 maximumPoolSize 大于 corePoolSize, 则创建新线程,且用新创建的线程执行此时提交的任务
  5. 提交新任务时,如果线程数达到 maximumPoolSize 大小,并且 workQueue 已满时,无法提交任务,触发 RejectedExecutionHandler

如果用一张动图来展示提交任务的过程(workers 即当前线程池中的线程数)

(假设执行中的任务都很忙)


注:以上测试没有揭示出某个任务执行完毕后,有新任务提交的情形。有兴趣的话可以对此进行测试,最后会发现 ThreadPoolExecutor 决定是否创建新线程是基于当前空闲线程的数量,有空闲线程则重用而不倾向于创建新的线程。

由于只有在 workQueue 爆满后线程池大小才会从 corePoolSize 增长至 maximumPoolSize, 因此我们用一个巨大的 workQueue 时,极有可能在内存耗尽时线程池的大小仍然是 corePoolSize 大小。如下机这样的写法
1new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

只有在 LinkedBlockingQueue() 满了之后,线程池大小才会大于 2,往 5 方向生长。

关于构造 ThreadPoolExecutor 时的 keepAliveTime/unit 的用途

如是 keepAliveTime 非零时,当线程数大于  corePoolSize, 某个线程在空闲多久时间后结束自己,最后线程数回退到 corePoolSize 大小。以下是测试
 1public static void main1(String[] args) throws InterruptedException {
 2    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
 3
 4    IntStream.rangeClosed(1, 14).forEach(i -> threadPool.submit(() -> runTask(6, i)));
 5    System.out.println(threadPool);   // 1
 6
 7    while (threadPool.getCompletedTaskCount() != 14){
 8    }
 9
10    System.out.println(threadPool);   // 2
11
12    TimeUnit.SECONDS.sleep(10);
13    System.out.println(threadPool);   // 3
14}

保证在执行完最后一个任务时有线程空闲已超过 3 秒,所以输出为
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0]         // 1
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 14]         // 2
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 14]         // 3
corePoolSize 比 maxiumPoolSize 小且 keepAliveTime 不为 0 时,超过 corePoolSize 部分的线程才有机会自我消亡。我们可以考虑不同的 corePoolSize,maximumPool 的设置来处理异常的高并发,并能结束长期不用的线程来释放相应的资源 -- 内存和文件句柄

关于 RejectedExecutionHandler

RejectedExecutionHandler 在 workQueue 已满,线程数达到 maximumPoolSize 并且都很忙的时候触发,默认的 RejectedExecutionHandler 是 AbortPolicy(), 即直接丢弃新提交的任务,这可能不是我们想要的,如果 workQueue 足够大不会是个问题。另外 Java 提供了其他几个 RejectedExecutionHandler 实现
  1. DiscardOldestPolicy: 移除一个最旧的未开始处理的任务并重试提交新任务。会造成连续移除旧的任务
  2. DiscardPolicy: 安静无异常的丢弃无法提交的新任务。会造成连续的丢弃新任务
  3. CallerRunsPolicy: 无法提交任务的话就能提交任务的线程执行,反正闲着也是闲着。在 Caller 执行新任务时也就无法提交后续任务,不会造成连续失败。这是一个配合有限 workQueue 队列的好办法。

ThreadPoolExecutor 从 workQueue(BlockingQueue) 中获取任务是调用它的 take() 方法,无任务则阻塞, 等待。在需要往 workQueue 中添加任务时是调用 offer(Runnable) 方法,而 offer() 是能往 workQueue 中添加任务则返回 true, 否则返回 false 并触发 RejectedExecutionHandler, 无异常,也不等待。但 BlockingQueue 的 put() 方法会等待啊,所以曾经使用了下面的方式来使用有限的 workQueue
 1BlockingQueue<Runnable> workQueue =  new LinkedBlockingQueue<Runnable>(10) {
 2    @Override
 3    public boolean offer(Runnable runnable) {
 4        try {
 5            this.put(runnable);
 6        } catch (InterruptedException e) {
 7            throw new RuntimeException(e);
 8        }
 9        return true;
10    }
11};
12
13ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, workQueue);

覆盖 offer() 方法,转换为无限等待的 put() 方法,或者可以是有等待期限的 offer(runnable, timeout, unit) 方法。

在 workQueue 已满时再提交任务的话会让提交任务的线程(Caller)一味的阻塞,无所作为,与 CallerRunsPolicy() 相比确实是浪费了 Caller 的资源。不过 CallerRunsPolicy 会有个问题,假如它在执行任务(一个重型任务)过程中,线程池里早先的任务已经执行完毕,释放了线程,但 Caller 被占用着无法及时提交更多的新任务,造成更多的线程浪费。

参考:
  1. JUC线程池: ThreadPoolExecutor详解
永久链接 https://yanbin.blog/understand-java-thread-pool-executor/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。