我们在创建 Java 线程池,无论是用 Executors, ThreadPoolExecutor, 还是 Spring 的 ThreadPoolTaskExecutor, 如果不指定工作队列的大小的话,默认为 Integer.MAX_VALUE(2147483647), 基本不会把它爆满,但是在许多的任务要执行时大量 Runnable 对象的创建却足以把内存撑爆掉。所以才有必要使用一个有限大小的工作队列,如 5000, 再配上 RejectedExecutionHandler(DiscardOldestPolicy, DiscardPolicy, 或 CallerRunsPolicy)。前两种策略会主动放弃最旧最新的任务,一般不是我们想要的,CallerRunsPolicy 还能主动发挥任务提交者的计算能力,是一种不错的选择(只可能会发生工作队列太小且提交者执行的任务太忙时产生线程池一时的空闲。
所以总结起来我们可以有以下几种实现
直接使用 CallerRunsPolicy
在工作队列满时有效利用提交任务的线程,不让它闲着,这种实现最简单, 像下面那样声明线程池
1 2 |
var threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy()); |
自定义工作队列
在自定义工作队列中让提交任务的线程一直等待 -- 主动的转换立即返回的 offer() 调用转换为持续等待的 put() 操作
这就是在之前 理解 Java 线程池 ThreadPoolExecutor 一文中介绍的实现代码,代码是
1 2 3 4 5 6 7 8 9 10 11 12 13 |
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(10) { @Override public boolean offer(Runnable runnable) { try { this.put(runnable); } catch (InterruptedException e) { throw new RuntimeException(e); } return true; } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, workQueue); |
实现 RejectedExecutionHandler 接口
接受任务提交失败,然后转换为 put() 等待. 这种方式本质上去前一种是一样的,只是被动的把立即返回的 offer() 调用转换为持续等待的 put() 操作
1 2 3 4 5 6 7 |
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), (r, executor) -> { try { executor.getQueue().put(r); } catch (InterruptedException e) { throw new RuntimeException(e); } }); |
它要比前一种方法稍微简单些,我们可以测试一下效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
private static AtomicInteger executedCount = new AtomicInteger(); private static AtomicInteger evertRejectedCount = new AtomicInteger(); public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), (r, executor) -> { try { everRejectedCount.incrementAndGet(); System.out.println("rejected"); executor.getQueue().put(r); System.out.println("submitted"); } catch (InterruptedException e) { throw new RuntimeException(e); } }); IntStream.rangeClosed(1, 100).forEach(i -> { threadPool.submit(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("task: " + executedCount.incrementAndGet()); }); }); threadPool.awaitTermination(1, TimeUnit.HOURS); System.out.printf("everRejectedCount: %s, executedCount: %s", everRejectedCount, executedCount); } |
每个任务会休眠半秒钟,肯定会造成工作队列满而提交失败的情况,看输出最后结果是
......
everRejectedCount: 40, executedCount: 100
中间有许多 rejected
的输出,总之所有任务都得到正常执行了。
按需选择吧。
can the submitter thread refresh outstanding SQS messages, rather than to fetch new messages?
没明白你想要实现什么