CompletableFuture 异步编排详解

前言

查询商品详情页面逻辑比较复杂,有些数据需要远程调用,必然需要花费更多的时间。
假如商品详情每个页面查询,需要的如下的标准时间完成,那么用户需要10s才能完成。这里我们需采用异步查询,但是比如接口A查询商品信息,而接口B需要查询商品的SKU,接口C需要查询商品供应商等信息,如接口C必须依赖接口A或接口B的返回值。那么我们就需要使用CompletableFuture接口来实现。

一、开启异步编程

runAsync:无入参、无返回值

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CompletableFutureTest3 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) {
System.out.println("main start ...");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("开启异步任务...");
}, service);
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
main start ...
main end ...
开启异步任务...

supplyAsync :无入参,可以获取返回值

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CompletableFutureTest3 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 ...");
return "开启异步任务,我是返回值";

}, service);
System.out.println("获取异步任务返回值:" + cf.get());
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
main start ...
开启异步任务 ...
获取异步任务返回值:开启异步任务,我是返回值
main end ...

二、计算完成回调

当我们想第一个异步任务执行完成后,还需要做其他的事情。我们的CompletableFuture提供了计算完成时回调方法,whenCompletewhenCompleteAsyncexceptionally等接口。

1
2
3
4
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

whenComplete 可以处理正常和异常的计算结果,exceptionally: 处理异常情况。

whenCompletewhenCompleteAsync 的区别是whenComplete 是执行当前任务的线程继续执行whenComplete的任务。

whenCompleteAsync: 是把whenCompleteAsync的任务继续提交给线程池来进行执行。

whenCompleteAsync

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureTest3 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 ...");
return 10 / 2;
}, service).whenCompleteAsync((res,exp)->{
System.out.println("异步结果已经执行完成 ...");
if (exp == null) {
System.out.println("异步执行结果为:" + res);
} else {
System.out.println("异步执行出错啦,出错信息为:" + exp.getMessage());
}
});
System.out.println("main end ...");
}

}

执行结果:

  • 正常
    1
    2
    3
    4
    5
    main start ...
    开启异步任务 ...
    main end ...
    异步结果已经执行完成 ...
    异步执行结果为:5
  • 异常
    1
    2
    3
    4
    5
    main start ...
    开启异步任务 ...
    main end ...
    异步结果已经执行完成 ...
    异步执行出错啦,出错信息为:java.lang.ArithmeticException: / by zero
    对于异常的捕获也可以这样写:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class CompletableFutureTest3 {  

    // 自定义线程池
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws Exception {
    System.out.println("main start ...");
    CompletableFuture.supplyAsync(() -> {
    System.out.println("开启异步任务 ...");
    return 10 / 0;
    }, service).whenCompleteAsync((res,exp)->{
    System.out.println("异步结果已经执行完成 ...");
    System.out.println("异步执行结果为:" + res);
    }).exceptionally(throwable->{
    System.out.println("捕获到异步执行的异常信息,出错信息为:" + throwable.getMessage());
    int res = 5;
    System.out.println("返回默认异常结果:" + 5);
    return res;
    });
    System.out.println("main end ...");
    }

    }
    执行结果:
    1
    2
    3
    4
    5
    6
    7
    main start ...
    开启异步任务 ...
    异步结果已经执行完成 ...
    异步执行结果为:null
    捕获到异步执行的异常信息,出错信息为:java.lang.ArithmeticException: / by zero
    返回默认异常结果:5
    main end ...
    我们可以看到,通过exceptionally可以捕获异步任务抛出来的异常信息,并对异常进行处理,并可以将处理结果返回。

whenComplete虽然可以得到异常信息,但是无法修改结果,exceptionally可以感知异常,同时可以返回默认值。

三、handle最终处理

handle和whenComplete方法类似,但是whenComplete能感知异常但是不能返回结果。只能通过exceptionally进行处理。

而handle即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CompletableFutureTest3 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 ...");
return 10 / 0;
}, service).handleAsync((res, exp) -> {
System.out.println("进入handleAsync方法 ...");
if (res != null) {
return res * 2;
}
if (exp != null) {
System.out.println("捕获到异步执行的异常信息,出错信息为:" + exp.getMessage());
return 0;
}
return 0;
}, service);
System.out.println("获取异步任务返回值:" + future.get());
System.out.println("main end ...");
}

}

执行结果:

1
2
3
4
5
6
main start ...
开启异步任务 ...
进入handleAsync方法 ...
捕获到异步执行的异常信息,出错信息为:java.lang.ArithmeticException: / by zero
获取异步任务返回值:0
main end ...

如果我们去掉异常信息,可以看到如下返回值,最终异步执行结果为10;最终执行结果如下:

1
2
3
4
5
main start ...
开启异步任务 ...
进入handleAsync方法 ...
获取异步任务返回值:10
main end ...

四、线程串行化

在CompletableFuture中有以下方法:

1
2
3
4
5
6
7
8
9
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值
  • thenAccept方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果
  • thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。
    thenRun 获取不到上个任务的执行结果,无返回值。

thenRun

thenRun 不能获取上一步的执行结果,并无返回值。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CompletableFutureTest3 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 ...");
return 10 / 2;
}, service).thenRun(()->{
System.out.println("任务2启动了...");
});
System.out.println("获取异步任务返回值:" + future.get());
System.out.println("main end ...");
}

}

执行结果:

1
2
3
4
5
main start ...
开启异步任务 ...
任务2启动了...
获取异步任务返回值:null
main end ...

如果我们需要获取上一步的执行结果,我们使用thenAccept;

thenAccept

消费处理结果,接收任务的处理结果,并消费处理,无返回结果

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CompletableFutureTest4 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 ...");
return 10 / 2;
}, service).thenAcceptAsync((res)->{
System.out.println("任务2启动了... res:" + res);
});
System.out.println("获取异步任务返回值:" + future.get());
System.out.println("main end ...");
}
}

执行结果:

1
2
3
4
5
main start ...
开启异步任务 ...
任务2启动了... res:5
获取异步任务返回值:null
main end ...

如果我们即需要上一步执行结果,并需要返回值供别人使用,那么我们使用thenApply方法;

thenApply

当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CompletableFutureTest5 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 ...");
return 10 / 2;
}, service).thenApplyAsync((res)->{
System.out.println("任务2启动了... res:" + res);
return res;
});
System.out.println("获取异步任务返回值:" + future.get());
System.out.println("main end ...");
}

}

执行结果:

1
2
3
4
5
main start ...
开启异步任务 ...
任务2启动了... res:5
获取异步任务返回值:5
main end ...

双任务-都执行(并)

1
2
3
4
5
6
7
8
9
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,         BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn,Executor executor);
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)};
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) }
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action, Executor executor)}

两个任务必须都完成,触发该任务。

  • runAfterBoth 没有返回值,入参CompletionStage、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务)
  • thenAcceptBoth 可以获取两个任务的返回值。
  • thenCombine 可以获取两个任务的返回值,并可以将任务三结果返回。

runAfterBoth

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureTest6 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1 ...");
return 10 / 2;
}, service);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务2 ...");
return "will";
}, service);
f1.runAfterBothAsync(f2, () -> {
System.out.println("开启异步任务3 ...");
}, service);
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
main start ...
开启异步任务1 ...
开启异步任务2 ...
main end ...
开启异步任务3 ...

可以看到,任务3是在任务1和任务2执行完成后,才执行的。

thenAcceptBoth

我们使用thenAcceptBoth可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值。我们看下案例。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CompletableFutureTest7 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1 ...");
return 10 / 2;
}, service);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务2 ...");
return "will";
}, service);
f1.thenAcceptBothAsync(f2, (f1Res,f2Res) -> {
System.out.println("开启异步任务3 ...,f1Res: " + f1Res + " f2Res: " + f2Res);
}, service);
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
main start ...
开启异步任务1 ...
开启异步任务2 ...
main end ...
开启异步任务3 ...,f1Res: 5 f2Res: will

我们可以看到,任务3在任务1和任务2执行后执行了,并获取了任务1和任务2的返回值。

thenCombineAsync

可以获取两个任务的返回值,并可以将任务三结果返回

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CompletableFutureTest8 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1 ...");
return 10 / 2;
}, service);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务2 ...");
return "will";
}, service);
CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (f1Res, f2Res) -> {
System.out.println("开启异步任务3 ...,f1Res: " + f1Res + " f2Res: " + f2Res);
return f1Res + "--->" + f2Res;
}, service);
System.out.println("f3.get() = " + f3.get());
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
6
main start ...
开启异步任务1 ...
开启异步任务2 ...
开启异步任务3 ...,f1Res: 5 f2Res: will
f3.get() = 5--->will
main end ...

双任务-任意(或)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action);  

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);

当两个任务中,任意一个future任务完成的时候,执行任务。

  • applyToEither 两个任务有一个任务执行完成,获取它的返回值,处理任务并有新的返回值。
  • acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  • runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

runAfterEitherAsync

不感知结果,自己没有返回值。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CompletableFutureTest9 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1 ...");
return 10 / 2;
}, service);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("开启异步任务2 ...");
return "will";
}, service);
CompletableFuture<Void> f3 = f1.runAfterEitherAsync(f2,()->{
System.out.println("开启异步任务3 ...");
}, service);
System.out.println("f3.get() = " + f3.get());
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
6
main start ...
开启异步任务1 ...
开启异步任务3 ...
f3.get() = null
main end ...
开启异步任务2 ...

我们可以看到,任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。

acceptEitherAsync

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CompletableFutureTest10 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1 ...");
return 10 / 2;
}, service);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("开启异步任务2 ...");
return 6;
}, service);
f1.acceptEitherAsync(f2,(res)->{
System.out.println("开启异步任务3 ... res: "+res);
}, service);
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
main start ...
开启异步任务1 ...
main end ...
开启异步任务3 ... res: 5
开启异步任务2 ...

可以看到,可以获取任务1的执行结果,但不返回执行结果。

applyToEither

可以感知结果,并返回执行结果。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CompletableFutureTest11 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1 ...");
return 10 / 2;
}, service);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("开启异步任务2 ...");
return 6;
}, service);
CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, (res) -> {
System.out.println("开启异步任务3 ... res: " + res);
return "will";
}, service);
System.out.println("f3.get() = " + f3.get());
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
6
main start ...
开启异步任务1 ...
开启异步任务3 ... res: 5
f3.get() = will
main end ...
开启异步任务2 ...

多任务组合

1
2
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
  • allOf:等待所有任务完成
  • anyOf: 只要有一个任务完成

allOf

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CompletableFutureTest12 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片 ...");
return "图片地址";
}, service);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性...");
return "黑色 256G";
}, service);
CompletableFuture<String> f3 = f1.supplyAsync(() -> {
System.out.println("查询商品品牌...");
return "苹果手机";
}, service);
CompletableFuture<Void> future = CompletableFuture.allOf(f1, f2, f3);
future.get();//等待索引结果完成
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
main start ...
查询商品图片 ...
查询商品属性...
查询商品品牌...
main end ...

注:如果不使用future.get()阻塞,若其中一个任务执行时间较长,则可能会丢失任务信息。

anyOf

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CompletableFutureTest13 {  

// 自定义线程池
public static ExecutorService service = Executors.newFixedThreadPool(5);

public static void main(String[] args) throws Exception {
System.out.println("main start ...");
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片 ...");
return "图片地址";
}, service);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性...");
return "黑色 256G";
}, service);
CompletableFuture<String> f3 = f1.supplyAsync(() -> {
System.out.println("查询商品品牌...");
return "苹果手机";
}, service);
CompletableFuture<Object> future = CompletableFuture.anyOf(f1, f2, f3);
System.out.println("future.get() = " + future.get());
System.out.println("main end ...");
}

}

执行结果为:

1
2
3
4
5
6
main start ...
查询商品图片 ...
查询商品属性...
future.get() = 图片地址
main end ...
查询商品品牌...

CompletableFuture 异步编排详解
https://github.com/yangxiangnanwill/yangxiangnanwill.github.io/2024/01/03/好好码代码吖/JAVA/异步编程/异步编程(3)-CompletableFuture 异步编排详解/
作者
will
发布于
2024年1月3日
许可协议