CompletableFuture 的并发性能研究

今天继续探讨 CompletableFuture 的特性,它并发时的性能如何呢?我们知道集合的 stream() 后的操作是序列化进行的,parallelStream()是能够并发执行的,而用 CompletableFuture 可以更灵活的控制并发。


我们先可以对比一下 parallelStream() 与 CompletableFuture 的性能差异

假设一个这样的耗时 1000 毫秒的计算任务
1private static int getJob() {
2    try {
3        Thread.sleep(1000);
4    } catch (InterruptedException e) {
5    }
6    return 50;
7}

分别用下面两个方法来测试,任务数可以通过参数来控制
 1private static long testParallelStream(int jobCount) {
 2    List<Supplier<Integer>> tasks = new ArrayList<>();
 3    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob));
 4
 5    long start = System.currentTimeMillis();
 6    int sum = tasks.parallelStream().map(Supplier::get).mapToInt(Integer::intValue).sum();
 7    return System.currentTimeMillis() - start;
 8}
 9
10private static long testCompletableFutureDefaultExecutor(int jobCount) {
11    List<CompletableFuture<Integer>> tasks = new ArrayList<>();
12    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob)));
13
14    long start = System.currentTimeMillis();
15    int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
16    return System.currentTimeMillis() - start;
17}

写下本文的测试机器用 Runtime.getRuntime().availableProcessors() 得到的 CPU 内核数是 4, 下面是两个方法在不同的并发任务数时一组耗时对照表

方法 | jobCount | 耗时(毫秒) 1345689142021
 testParallelStream 100910041002 2005 2006 2006 3012 4014 5020 6018
 testCompletableFutureDefaultExecutor 1005 1005 2005 2002 2008 3008 3008 5016 7025 7026

从上面数据来看,默认情况下,随着任务数的增加,CompletableFuture 反而比最简单的 parallelStream() 操作方式性能显得越发差一些,是不是有些失望了。慢着,我们是说默认情况下, parallelStream()CompletableFuture.supplyAsync(job) 的 job 都会分配给默认 ForkJoinPool.commonPool() 去执行,而这个线程池的大小是 CPU 的内核数,所以它们没有多大区别,甚至 CompletableFuture 的方式比 parallelStream() 更快达到线程的饱和,性能还略微差一些。

可是,不用急,我们还有大招,因为 parallelStream() 不能灵活的干预线程池的大小(默认为 CPU 内核数),要改的话会影响到系统全局的 ForkJoinPool.commonPool() 的大小。可通过指定系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 来指定这个线程池大小。然而 CompletableFuture 还有第二个版本的  supplyAsync(supplier, executor) 方法,由第二个参数来指定所使用的线程池,所以我们再定义一个方法,把自定义线程池大小调到 20, 然后重新进行测试并与前面数据对比
1private static long testCompletableFutureDefaultExecutor(int jobCount) {
2    List<CompletableFuture<Integer>> tasks = new ArrayList<>();
3    IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob)));
4
5    long start = System.currentTimeMillis();
6    int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
7    return System.currentTimeMillis() - start;
8}

测试的结果如下

方法 | jobCount | 耗时(毫秒)1345689142021
testParallelStream1009100410022005200620063012401450206018
testCompletableFutureDefaultExecutor1005100520052002200830083008501670257026
testCompletableFutureCustomExecutor1004100310041002100510011003100210032005

现在 CompletableFuture 的优越性就体现出来了,任务数在没有超过线程池大小 20 的时候,仿佛在执行一个任务一样,所需耗时一保持在 1000 毫秒左右,任务数在 21 的时候才发生第一次线程等待的情况,所以耗时为 2005 毫秒。

继续观察上面那个耗时对照表,结合执行每个方法时的线程池大小来解读上面的测试结果,简单点来讲上面标记为绿色的数字开始出现了任务等待线程池中的可用线程,当把线程池调大一些并发性能就越高。当然不是线程数越大越好,因为我们这儿例子中的计算任务很简单,所以线程再大些也无妨。

对于如何选择合适的线程池大小,这里不深究了,需权衡到计算任务的耗时,是否有 I/O 的等,有一个公式是
Nthreads = Ncpu * Ucpu * (1 + W/C) Ncpu 是 CPU 内核数,通过 Runtime.getRuntime().availableProcessors() 得到的值
Ucpu 是期望的 CPU 的使用率(介于 0 与 1 之间)
W/C 是等待时间与任务执行时间的比率
下面是本篇测试用的完整代码:
 1package cc.unmi;
 2
 3import java.util.ArrayList;
 4import java.util.Arrays;
 5import java.util.List;
 6import java.util.concurrent.CompletableFuture;
 7import java.util.concurrent.Executor;
 8import java.util.concurrent.ForkJoinPool;
 9import java.util.function.Supplier;
10import java.util.stream.IntStream;
11
12import static java.lang.String.format;
13
14public class Main {
15
16    private static int PROCESSORS = Runtime.getRuntime().availableProcessors();
17
18    public static void main(String[] args) {
19        System.out.println("Processors: " + PROCESSORS);
20
21        Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17).forEach(offset -> {
22            int jobNum = PROCESSORS + offset;
23            System.out.println(
24                format("When %s tasks => stream: %s, parallelStream: %s, future default: %s, future custom: %s",
25                    jobNum, testStream(jobNum), testParallelStream(jobNum),
26                    testCompletableFutureDefaultExecutor(jobNum), testCompletableFutureCustomExecutor(jobNum)));
27        });
28
29    }
30
31    private static long testStream(int jobCount) {
32        List<Supplier<Integer>> tasks = new ArrayList<>();
33        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob));
34
35        long start = System.currentTimeMillis();
36        int sum = tasks.stream().map(Supplier::get).mapToInt(Integer::intValue).sum();
37        checkSum(sum, jobCount);
38        return System.currentTimeMillis() - start;
39    }
40
41    private static long testParallelStream(int jobCount) {
42        List<Supplier<Integer>> tasks = new ArrayList<>();
43        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(Main::getJob));
44
45        long start = System.currentTimeMillis();
46        int sum = tasks.parallelStream().map(Supplier::get).mapToInt(Integer::intValue).sum();
47        checkSum(sum, jobCount);
48        return System.currentTimeMillis() - start;
49    }
50
51    private static long testCompletableFutureDefaultExecutor(int jobCount) {
52        List<CompletableFuture<Integer>> tasks = new ArrayList<>();
53        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob)));
54
55        long start = System.currentTimeMillis();
56        int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
57        checkSum(sum, jobCount);
58        return System.currentTimeMillis() - start;
59    }
60
61    private static long testCompletableFutureCustomExecutor(int jobCount) {
62        Executor executor = new ForkJoinPool(20);
63
64        List<CompletableFuture<Integer>> tasks = new ArrayList<>();
65        IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(Main::getJob, executor)));
66
67        long start = System.currentTimeMillis();
68        int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum();
69        checkSum(sum, jobCount);
70        return System.currentTimeMillis() - start;
71    }
72
73    private static int getJob() {
74        try {
75            Thread.sleep(1000);
76        } catch (InterruptedException e) {
77        }
78
79        return 50;
80    }
81
82    private static void checkSum(int sum, int jobNum) {
83        if(sum != 50 * jobNum) {
84            throw new AssertionError("Result Error");
85        }
86    }
87}

运行后控制台输出
Processors: 4
When 1 tasks => stream: 1007, parallelStream: 1009, future default: 1005, future custom: 1004
When 3 tasks => stream: 3014, parallelStream: 1004, future default: 1005, future custom: 1003
When 4 tasks => stream: 4010, parallelStream: 1002, future default: 2005, future custom: 1004
When 5 tasks => stream: 5013, parallelStream: 2005, future default: 2002, future custom: 1002
When 6 tasks => stream: 6019, parallelStream: 2006, future default: 2008, future custom: 1005
When 8 tasks => stream: 8026, parallelStream: 2006, future default: 3008, future custom: 1001
When 9 tasks => stream: 9028, parallelStream: 3012, future default: 3008, future custom: 1003
When 14 tasks => stream: 14045, parallelStream: 4014, future default: 5016, future custom: 1002
When 20 tasks => stream: 20049, parallelStream: 5020, future default: 7025, future custom: 1003
When 21 tasks => stream: 21057, parallelStream: 6018, future default: 7026, future custom: 2005
不同机器上跑的需依据实际 CPU 内核和所定制的线程池大小数来调整每次测试的任务数,即须修改 Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17) 中的测试参数来覆盖某些边界值。 永久链接 https://yanbin.blog/completablefuture-concurrent-performance/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。