https://cloud.tencent.com/developer/article/1755510

Java8新增了CompletableFuture 提供对异步计算的支持,可以通过回调的方式处理计算结果,CompletableFuture 类实现了CompletionStageFuture接口,所以还可以像之前使用Future那样使用CompletableFuture ,尽管已不再推荐这样用了。

试想下这个场景:要执行一个任务逻辑(交给另一个线程处理),并针对任务结果转换,最后执行打印操作,那么该如完成呢?一种是使用Future.get获取任务结果并执行转换逻辑,最后执行打印操作,有没有像stream那样的处理方式呢?借助CompletableFuture的话,实现代码如下:

CompletableFuture.supplyAsync(() -> "000")
        .thenApply(s -> s.length()) // Function
        .whenComplete((integer, throwable) -> System.out.println(integer));

从上述的示例代码,我们可以大致分析下CompletableFuture的执行流程,首先CompletableFuture会异步执行supply任务,当supply任务执行结束时会自动执行对应的计算s.length()逻辑,这里问题来了?

当前调用thenApply方法的线程(这里是main线程)会对CompletableFuture提交Function(对应的是计算s.length()逻辑),那么到底是哪个线程执行的计算s.length()逻辑呢?由于supply任务是由其他线程来执行的(这里对应的是ForkJoin线程),当main线程调用thenApply方法时,不能确定supply任务是否执行完毕的!因此这时就要分2种情况:

对于上述的两种情况测试代码如下:

CompletableFuture.completedFuture("000")
        .thenApply(r -> r)
        .whenComplete((r, e) -> System.out.println(format(r)));

CompletableFuture.supplyAsync(() -> {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
            return "111";
        })
        .thenApply(r->r)
        .whenComplete((r, e) -> System.out.println(format(r)));

private static String format(String msg) {
    return String.format("[%s] %s", Thread.currentThread().getName(), msg);
}

输出结果如下:

https://ask.qcloudimg.com/http-save/yehe-5964114/eqd9j685z5.png

最后到了whenComplete的逻辑,其实仔细思考下,不管是thenApply还是whenComplete都是接下来要执行的动作,那么它们的执行逻辑应该是类似的,这里不再赘述。

在上述分析完毕后,我们来实际看下CompletableFuture源码,来一探究竟其执行流程,为了方便查看源码debug,使用如下示例代码:

CompletableFuture.supplyAsync(() -> {
    // random n millisecond
    int ms = new Random().nextInt(100);
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(ms));

    String msg = String.format("supplyAsync %s ms", ms);
    System.out.println(format(msg));
    return msg;
}).thenApply(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) {
        System.out.println(format("thenApply apply s.length()"));
        return s.length();
    }
}).whenComplete(new BiConsumer<Integer, Throwable>() {
    @Override
    public void accept(Integer s, Throwable throwable) {
        System.out.println(format("done " + s));
    }
});

输出结果为:

https://ask.qcloudimg.com/http-save/yehe-5964114/4dyqr0ao0s.png

下面就按照示例代码照提交supplyAsync提交thenApply执行whenComplete流程来进行分析,注意 CompletableFuture 的方法大都是返回新的CompletableFuture对象。

提交supplyAsync时,如果外部未传递线程池那么就会使用默认的ForkJoin线程池,然后线程池中提交AsyncSupply任务,AsyncSupply类继承ForkJoinTask并实现了Runnable,源码如下: