使用 JDK 5 的线程池实现有近 20 年的时间了,快速创建一个线程池经常是调用 Executors 中的工厂方法。但是涉及过更精细的线程池管理控制时不得不用 ThreadPoolExecutor 的构造方法,这也就是为什么有些公司不建议用 Executors 的工厂方法创建线程池,而应该直接创建 ThreadPoolExecutor 或 ForkJoinPool 实例。
例如代码
ExecutorService threadPool = Executors.newFixedThreadPool(10);
实际上调用的是
new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
前两个参数 corePoolSize 和 maximumPoolSize 是一样的; OL, TimeUnit.MILLISECONDS 表示线程创建后只要线程池还在就是永生的; workQueue 是一个大小为 Integer.MAX_VALUE 的队列, 几乎可以无限提交任务,耗尽内存
不建议用 Executors 的工厂方法的原因大致有二:
- 创建的 ExecutorService 类型没有提供访问线程池内部状态的方法,不过愿意转型的话
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
这个不是问题
- ThreadPoolExecutor 有比 Executors 工厂方法更丰富的构造参数,Executors.newFixedThreadPool(10) 满足不了更复杂的情形。巨大的 workQueue(new LinedBlockingQueue<Runnable>()) 在提交大量任务时会撑爆内存; 自定义线程名称,行为; 无法提交任务时的处理方式等
ThreadPoolExecutor 的完整构建函数是
1 2 3 4 5 6 7 |
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) |
本文通过调试, 感性的方式来理解创建 ThreadPoolExecutor 实例的各个参数的含义,如果想要理性些就应该阅读它的源代码。
新线程池不会预先创建线程
1 2 3 4 |
public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); System.out.println(threadPool); } |
默认的 ThreadFactory 是 Executors.defaultThreadFactory(), 默认的 RejectedExecutionHandler 是 new ThreadPoolExecutor.AbortPolicy()
输出为
java.util.concurrent.ThreadPoolExecutor@2cfb4a64[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
上面代码运行后线程池中无线程,代码能正常退出,无需调用 threadPool.shutdown()
提交第一个任务只创建一个线程
往下测试时添加一个辅助任务方法
1 2 3 4 5 6 7 8 9 10 |
private static void runTask(int sleepInSeconds, int taskNum) { try { System.out.printf("[%s] %s start task: %s\n", Thread.currentThread().getName(), LocalTime.now(), taskNum); Thread.sleep(sleepInSeconds * 1000); System.out.printf("[%s] %s end task: %s\n", Thread.currentThread().getName(), LocalTime.now(), taskNum); } catch (InterruptedException e) { throw new RuntimeException(e); } } |
添加一个任务
1 2 3 4 5 |
public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); threadPool.submit(() -> runTask(10, 1)); System.out.println(threadPool); } |
提交的一个任务将会在 10 秒后结束,这时候打印输出的是
[pool-1-thread-1] 00:23:04.344 start task: 1
java.util.concurrent.ThreadPoolExecutor@57829d67[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[pool-1-thread-1] 00:23:14.359 end task: 1
说明线程池中创建了一个线程,并且程序不会正常退出,原因是线程池中创建了一个 daemon 为 false 的线程,它会阻止当前进程(主线程)的退出,只有在调用 threadPool.shutdown() 或 threadPool.shutdownNow() 等其中的任务完成后才会退出进程。
ThreadPoolExecutor 的 shutdown() 和 shutdownNow() 的执行效果不在本文讲述之列。
综合测试观察任务数与线程池, 等待队列的关系
接下来是通过输入参数来控制提供的任务数,从而观察线程的内部状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static void main1(String[] args) throws InterruptedException { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); int numberOfTasks = Integer.parseInt(args[0]); IntStream.rangeClosed(1, numberOfTasks).forEach(i -> threadPool.submit(() -> runTask(10, i)) ); TimeUnit.SECONDS.sleep(1); System.out.println(threadPool); threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); System.out.println(threadPool); } |
每个任务的执行时长为 10 秒,因此所有任务提交进线程池 2 秒后都没有任务完成
如果参数为 2 时输出为
[pool-1-thread-1] 00:23:37.858 start task: 1
[pool-1-thread-2] 00:23:37.858 start task: 2
java.util.concurrent.ThreadPoolExecutor@19dfb72a[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[pool-1-thread-2] 00:23:47.871 end task: 2
[pool-1-thread-1] 00:23:47.871 end task: 1
java.util.concurrent.ThreadPoolExecutor@19dfb72a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
下面是改变 numberOfTasks 时更多的测试结果(第一个 System.out.println(threadPool) 中的 pool size, active threads, 和 queued tasks)
numberOfTasks | pool size |
active threads (工作线程数) |
queued tasks | 开始执行 的任务号 |
说明, 创建线程池的参数为 (core: 2, max: 5, workQueue size: 10) |
0 | 0 | 0 | 0 | 无 | 无任务时不创建线程,不存 daemon 为 false 的线程,所以程序能正常退出 |
1 | 1 | 1 | 0 | 1 | 任务数为 1 时创建相应的一个线程 |
2 | 2 | 2 | 0 | 1,2 | 任务数为 2 时创建两个线程,这时线程数到达一个 core 指定的 临界值 |
3 | 2 | 2 | 1 | 1,2 | 任务数为 3, 介于 core: 2 和 max: 5 之间,只创建 core 指定的 2 个线程, 额外任务放到 workQueue 队列中 |
4 | 2 | 2 | 2 | 1,2 | 与上同,保持 core 数值指定的 2 个线程,额外任务陆续放到 workQueue 队列中 |
5 | 2 | 2 | 3 | 1,2 | |
6 | 2 | 2 | 4 | 1,2 | |
7 | 2 | 2 | 5 | 1,2 | |
8 | 2 | 2 | 6 | 1,2 | |
9 | 2 | 2 | 7 | 1,2 | |
10 | 2 | 2 | 8 | 1,2 | |
11 | 2 | 2 | 9 | 1,2 | |
12 | 2 | 2 | 10 | 1,2 | 此时达到 workQueue 队列已满的 临界值 |
13 | 3 | 3 | 10 | 1,2,13 | workQueue 队列已满,增加一个线程直接执行当前新添加的任务 |
14 | 4 | 4 | 10 | 1,2,13,14 | workQueue 队列已满,继续增加一个线程直接执行当前新添加的任务 |
15 | 5 | 5 | 10 | 1,2,13,14,15 | workQueue 队列已满,增加增加一个线程直接执行当前新添加的任务, 这时线程数量达到 max 值. workQueue 与线程数都达到 临界值 |
16 | 5 | 5 | 10 | 1,2,13,14,15 |
提交 16 号任务时失败, 但已提交的任务仍能被执行 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d6e8792 rejected from java.util.concurrent.ThreadPoolExecutor@2812cbfa[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 线程数达到 max 值,并且在 workQueue 队列已满时,新提交的任务触发 RejectedExecutionHandler |
从以上的测试结果,我们不难理解构建 ThreadPoolExecutor 时的 corePoolSize, maximumPoolSize, workQueue 大小和提交任务数时的关系
- 有新任务提交后才会创建线程
- 提交新任务时,如果空闲线程少于 corePoolSize, 则创建新线程执行任务
- 提交新任务时,如果线程数等于 corePoolSize, 并且 workQueue 未满时,则新任务放到 workQueue 中
- 提交新任务时,如果线程数等于 corePoolSize, 并且 workQueue 已满时,同时 maximumPoolSize 大于 corePoolSize, 则创建新线程,且用新创建的线程执行此时提交的任务
- 提交新任务时,如果线程数达到 maximumPoolSize 大小,并且 workQueue 已满时,无法提交任务,触发 RejectedExecutionHandler
如果用一张动图来展示提交任务的过程(workers 即当前线程池中的线程数)
注:以上测试没有揭示出某个任务执行完毕后,有新任务提交的情形。有兴趣的话可以对此进行测试,最后会发现 ThreadPoolExecutor 决定是否创建新线程是基于当前空闲线程的数量,有空闲线程则重用而不倾向于创建新的线程。
由于只有在 workQueue 爆满后线程池大小才会从 corePoolSize 增长至 maximumPoolSize, 因此我们用一个巨大的 workQueue 时,极有可能在内存耗尽时线程池的大小仍然是 corePoolSize 大小。如下机这样的写法
1 |
new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); |
只有在 LinkedBlockingQueue() 满了之后,线程池大小才会大于 2,往 5 方向生长。
关于构造 ThreadPoolExecutor 时的 keepAliveTime/unit 的用途
如是 keepAliveTime 非零时,当线程数大于 corePoolSize, 某个线程在空闲多久时间后结束自己,最后线程数回退到 corePoolSize 大小。以下是测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static void main1(String[] args) throws InterruptedException { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); IntStream.rangeClosed(1, 14).forEach(i -> threadPool.submit(() -> runTask(6, i))); System.out.println(threadPool); // 1 while (threadPool.getCompletedTaskCount() != 14){ } System.out.println(threadPool); // 2 TimeUnit.SECONDS.sleep(10); System.out.println(threadPool); // 3 } |
保证在执行完最后一个任务时有线程空闲已超过 3 秒,所以输出为
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 4, active threads = 4, queued tasks = 10, completed tasks = 0] // 1
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 14] // 2
java.util.concurrent.ThreadPoolExecutor@17f6480[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 14] // 3
corePoolSize 比 maxiumPoolSize 小且 keepAliveTime 不为 0 时,超过 corePoolSize 部分的线程才有机会自我消亡。我们可以考虑不同的 corePoolSize,maximumPool 的设置来处理异常的高并发,并能结束长期不用的线程来释放相应的资源 -- 内存和文件句柄
关于 RejectedExecutionHandler
RejectedExecutionHandler 在 workQueue 已满,线程数达到 maximumPoolSize 并且都很忙的时候触发,默认的 RejectedExecutionHandler 是 AbortPolicy(), 即直接丢弃新提交的任务,这可能不是我们想要的,如果 workQueue 足够大不会是个问题。另外 Java 提供了其他几个 RejectedExecutionHandler 实现
- DiscardOldestPolicy: 移除一个最旧的未开始处理的任务并重试提交新任务。会造成连续移除旧的任务
- DiscardPolicy: 安静无异常的丢弃无法提交的新任务。会造成连续的丢弃新任务
- CallerRunsPolicy: 无法提交任务的话就能提交任务的线程执行,反正闲着也是闲着。在 Caller 执行新任务时也就无法提交后续任务,不会造成连续失败。这是一个配合有限 workQueue 队列的好办法。
ThreadPoolExecutor 从 workQueue(BlockingQueue) 中获取任务是调用它的 take()
方法,无任务则阻塞, 等待。在需要往 workQueue 中添加任务时是调用 offer(Runnable)
方法,而 offer()
是能往 workQueue 中添加任务则返回 true, 否则返回 false 并触发 RejectedExecutionHandler, 无异常,也不等待。但 BlockingQueue 的 put()
方法会等待啊,所以曾经使用了下面的方式来使用有限的 workQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 |
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(10) { @Override public boolean offer(Runnable runnable) { try { this.put(runnable); } catch (InterruptedException e) { throw new RuntimeException(e); } return true; } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, workQueue); |
覆盖 offer()
方法,转换为无限等待的 put()
方法,或者可以是有等待期限的 offer(runnable, timeout, unit)
方法。
在 workQueue 已满时再提交任务的话会让提交任务的线程(Caller)一味的阻塞,无所作为,与 CallerRunsPolicy() 相比确实是浪费了 Caller 的资源。不过 CallerRunsPolicy 会有个问题,假如它在执行任务(一个重型任务)过程中,线程池里早先的任务已经执行完毕,释放了线程,但 Caller 被占用着无法及时提交更多的新任务,造成更多的线程浪费。
参考:
本文链接 https://yanbin.blog/understand-java-thread-pool-executor/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
[…] 这就是在之前 理解 Java 线程池 ThreadPoolExecutor 一文中介绍的实现代码,代码是 […]