https://cloud.tencent.com/developer/article/1755510
Java8新增了CompletableFuture
提供对异步计算的支持,可以通过回调的方式处理计算结果,CompletableFuture 类实现了CompletionStage
和Future
接口,所以还可以像之前使用Future那样使用CompletableFuture ,尽管已不再推荐这样用了。
试想下这个场景:要执行一个任务逻辑(交给另一个线程处理),并针对任务结果转换,最后执行打印操作,那么该如完成呢?一种是使用Future.get获取任务结果并执行转换逻辑,最后执行打印操作,有没有像stream那样的处理方式呢?借助CompletableFuture的话,实现代码如下:
CompletableFuture.supplyAsync(() -> "000")
.thenApply(s -> s.length()) // Function
.whenComplete((integer, throwable) -> System.out.println(integer));
从上述的示例代码,我们可以大致分析下CompletableFuture
的执行流程,首先CompletableFuture
会异步执行supply任务,当supply任务执行结束时会自动执行对应的计算s.length()逻辑,这里问题来了?
当前调用thenApply方法的线程(这里是main线程)会对CompletableFuture提交Function(对应的是计算s.length()逻辑),那么到底是哪个线程执行的计算s.length()逻辑呢?由于supply任务是由其他线程来执行的(这里对应的是ForkJoin线程),当main线程调用thenApply方法时,不能确定supply任务是否执行完毕的!因此这时就要分2种情况:
对于上述的两种情况测试代码如下:
CompletableFuture.completedFuture("000")
.thenApply(r -> r)
.whenComplete((r, e) -> System.out.println(format(r)));
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
return "111";
})
.thenApply(r->r)
.whenComplete((r, e) -> System.out.println(format(r)));
private static String format(String msg) {
return String.format("[%s] %s", Thread.currentThread().getName(), msg);
}
输出结果如下:
最后到了whenComplete的逻辑,其实仔细思考下,不管是thenApply还是whenComplete都是接下来要执行的动作,那么它们的执行逻辑应该是类似的,这里不再赘述。
在上述分析完毕后,我们来实际看下CompletableFuture源码,来一探究竟其执行流程,为了方便查看源码debug,使用如下示例代码:
CompletableFuture.supplyAsync(() -> {
// random n millisecond
int ms = new Random().nextInt(100);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(ms));
String msg = String.format("supplyAsync %s ms", ms);
System.out.println(format(msg));
return msg;
}).thenApply(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
System.out.println(format("thenApply apply s.length()"));
return s.length();
}
}).whenComplete(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer s, Throwable throwable) {
System.out.println(format("done " + s));
}
});
输出结果为:
下面就按照示例代码照提交supplyAsync
、提交thenApply
、执行whenComplete
流程来进行分析,注意 CompletableFuture 的方法大都是返回新的CompletableFuture对象。
提交supplyAsync时,如果外部未传递线程池那么就会使用默认的ForkJoin线程池,然后线程池中提交AsyncSupply任务,AsyncSupply类继承ForkJoinTask并实现了Runnable,源码如下: