今天继续探讨 CompletableFuture
的特性,它并发时的性能如何呢?我们知道集合的 stream()
后的操作是序列化进行的,parallelStream()
是能够并发执行的,而用 CompletableFuture
可以更灵活的控制并发。
我们先可以对比一下 parallelStream() 与 CompletableFuture 的性能差异
假设一个这样的耗时 1000 毫秒的计算任务
1 2 3 4 5 6 7 |
private static int getJob() { try { Thread.sleep(1000); } catch (InterruptedException e) { } return 50; } |
分别用下面两个方法来测试,任务数可以通过参数来控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private static long testParallelStream(int jobCount) { List<Supplier<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob)); long start = System.currentTimeMillis(); int sum = tasks.parallelStream().map(Supplier::get).mapToInt(Integer::intValue).sum(); return System.currentTimeMillis() - start; } private static long testCompletableFutureDefaultExecutor(int jobCount) { List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); return System.currentTimeMillis() - start; } |
写下本文的测试机器用 Runtime.getRuntime().availableProcessors()
得到的 CPU 内核数是 4, 下面是两个方法在不同的并发任务数时一组耗时对照表
方法 | jobCount | 耗时(毫秒) | 1 | 3 | 4 | 5 | 6 | 8 | 9 | 14 | 20 | 21 |
testParallelStream | 1009 | 1004 | 1002 | 2005 | 2006 | 2006 | 3012 | 4014 | 5020 | 6018 |
testCompletableFutureDefaultExecutor | 1005 | 1005 | 2005 | 2002 | 2008 | 3008 | 3008 | 5016 | 7025 | 7026 |
从上面数据来看,默认情况下,随着任务数的增加,CompletableFuture
反而比最简单的 parallelStream()
操作方式性能显得越发差一些,是不是有些失望了。慢着,我们是说默认情况下, parallelStream()
和 CompletableFuture.supplyAsync(job)
的 job 都会分配给默认 ForkJoinPool.commonPool()
去执行,而这个线程池的大小是 CPU 的内核数,所以它们没有多大区别,甚至 CompletableFuture
的方式比 parallelStream()
更快达到线程的饱和,性能还略微差一些。
可是,不用急,我们还有大招,因为 parallelStream()
不能灵活的干预线程池的大小(默认为 CPU 内核数),要改的话会影响到系统全局的 ForkJoinPool.commonPool()
的大小。可通过指定系统属性 java.util.concurrent.ForkJoinPool.common.parallelism
来指定这个线程池大小。然而 CompletableFuture
还有第二个版本的 supplyAsync(supplier, executor)
方法,由第二个参数来指定所使用的线程池,所以我们再定义一个方法,把自定义线程池大小调到 20, 然后重新进行测试并与前面数据对比
1 2 3 4 5 6 7 8 |
private static long testCompletableFutureDefaultExecutor(int jobCount) { List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); return System.currentTimeMillis() - start; } |
测试的结果如下
方法 | jobCount | 耗时(毫秒) | 1 | 3 | 4 | 5 | 6 | 8 | 9 | 14 | 20 | 21 |
testParallelStream | 1009 | 1004 | 1002 | 2005 | 2006 | 2006 | 3012 | 4014 | 5020 | 6018 |
testCompletableFutureDefaultExecutor | 1005 | 1005 | 2005 | 2002 | 2008 | 3008 | 3008 | 5016 | 7025 | 7026 |
testCompletableFutureCustomExecutor | 1004 | 1003 | 1004 | 1002 | 1005 | 1001 | 1003 | 1002 | 1003 | 2005 |
现在 CompletableFuture
的优越性就体现出来了,任务数在没有超过线程池大小 20 的时候,仿佛在执行一个任务一样,所需耗时一保持在 1000 毫秒左右,任务数在 21 的时候才发生第一次线程等待的情况,所以耗时为 2005
毫秒。
继续观察上面那个耗时对照表,结合执行每个方法时的线程池大小来解读上面的测试结果,简单点来讲上面标记为绿色的数字开始出现了任务等待线程池中的可用线程,当把线程池调大一些并发性能就越高。当然不是线程数越大越好,因为我们这儿例子中的计算任务很简单,所以线程再大些也无妨。
对于如何选择合适的线程池大小,这里不深究了,需权衡到计算任务的耗时,是否有 I/O 的等,有一个公式是
Nthreads = Ncpu * Ucpu * (1 + W/C)
Ncpu 是 CPU 内核数,通过Runtime.getRuntime().availableProcessors()
得到的值
Ucpu 是期望的 CPU 的使用率(介于 0 与 1 之间)
W/C 是等待时间与任务执行时间的比率
下面是本篇测试用的完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
package cc.unmi; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; import java.util.stream.IntStream; import static java.lang.String.format; public class Main { private static int PROCESSORS = Runtime.getRuntime().availableProcessors(); public static void main(String[] args) { System.out.println("Processors: " + PROCESSORS); Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17).forEach(offset -> { int jobNum = PROCESSORS + offset; System.out.println( format("When %s tasks => stream: %s, parallelStream: %s, future default: %s, future custom: %s", jobNum, testStream(jobNum), testParallelStream(jobNum), testCompletableFutureDefaultExecutor(jobNum), testCompletableFutureCustomExecutor(jobNum))); }); } private static long testStream(int jobCount) { List<Supplier<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob)); long start = System.currentTimeMillis(); int sum = tasks.stream().map(Supplier::get).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; } private static long testParallelStream(int jobCount) { List<Supplier<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob)); long start = System.currentTimeMillis(); int sum = tasks.parallelStream().map(Supplier::get).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; } private static long testCompletableFutureDefaultExecutor(int jobCount) { List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; } private static long testCompletableFutureCustomExecutor(int jobCount) { Executor executor = new ForkJoinPool(20); List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob, executor))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; } private static int getJob() { try { Thread.sleep(1000); } catch (InterruptedException e) { } return 50; } private static void checkSum(int sum, int jobNum) { if(sum != 50 * jobNum) { throw new AssertionError("Result Error"); } } } |
运行后控制台输出
Processors: 4
When 1 tasks => stream: 1007, parallelStream: 1009, future default: 1005, future custom: 1004
When 3 tasks => stream: 3014, parallelStream: 1004, future default: 1005, future custom: 1003
When 4 tasks => stream: 4010, parallelStream: 1002, future default: 2005, future custom: 1004
When 5 tasks => stream: 5013, parallelStream: 2005, future default: 2002, future custom: 1002
When 6 tasks => stream: 6019, parallelStream: 2006, future default: 2008, future custom: 1005
When 8 tasks => stream: 8026, parallelStream: 2006, future default: 3008, future custom: 1001
When 9 tasks => stream: 9028, parallelStream: 3012, future default: 3008, future custom: 1003
When 14 tasks => stream: 14045, parallelStream: 4014, future default: 5016, future custom: 1002
When 20 tasks => stream: 20049, parallelStream: 5020, future default: 7025, future custom: 1003
When 21 tasks => stream: 21057, parallelStream: 6018, future default: 7026, future custom: 2005
不同机器上跑的需依据实际 CPU 内核和所定制的线程池大小数来调整每次测试的任务数,即须修改 Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17)
中的测试参数来覆盖某些边界值。
testParallelStream 开始计时时tasks已经准备好了,但是CompletableFuture方式开始计时还可能等待数据准备。
这个时间到底想统计那部分?直接方在函数开始的地方不是更准确吗
确实有点问题,testParallelStream 准备 tasks 时任务还没开始执行,而 CompletableFuture 准备 tasks 的同时有时任务已经开始执行了,测试不太准确,虽然准备任务的过程很快。开始时间移到函数第一行要更准确。
谢谢,需要找到四核 CPU 的机器修改代码重新跑一下获得测试结果。
这是一个神奇的测试,与其说在测试CompletableFuture的并发性能,不如说在测试线程池
可以这么说。