前言 查询商品详情页面逻辑比较复杂,有些数据需要远程调用,必然需要花费更多的时间。 假如商品详情每个页面查询,需要的如下的标准时间完成,那么用户需要10s才能完成。这里我们需采用异步查询,但是比如接口A查询商品信息,而接口B需要查询商品的SKU,接口C需要查询商品供应商等信息,如接口C必须依赖接口A或接口B的返回值。那么我们就需要使用CompletableFuture接口来实现。
一、开启异步编程 runAsync:无入参、无返回值 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class CompletableFutureTest3 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) { System.out.println("main start ..." ); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("开启异步任务..." ); }, service); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 main start ... main end ... 开启异步任务...
supplyAsync :无入参,可以获取返回值 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CompletableFutureTest3 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 ..." ); return "开启异步任务,我是返回值" ; }, service); System.out.println("获取异步任务返回值:" + cf.get()); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 main start ... 开启异步任务 ... 获取异步任务返回值:开启异步任务,我是返回值 main end ...
二、计算完成回调 当我们想第一个异步任务执行完成后,还需要做其他的事情。我们的CompletableFuture
提供了计算完成时回调方法,whenComplete
、whenCompleteAsync
、exceptionally
等接口。
1 2 3 4 public CompletableFuture<T> whenComplete (BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn)
whenComplete
可以处理正常和异常的计算结果,exceptionally
: 处理异常情况。
whenComplete
和whenCompleteAsync
的区别是whenComplete 是执行当前任务的线程继续执行whenComplete的任务。
whenCompleteAsync: 是把whenCompleteAsync的任务继续提交给线程池来进行执行。
whenCompleteAsync 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class CompletableFutureTest3 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 ..." ); return 10 / 2 ; }, service).whenCompleteAsync((res,exp)->{ System.out.println("异步结果已经执行完成 ..." ); if (exp == null ) { System.out.println("异步执行结果为:" + res); } else { System.out.println("异步执行出错啦,出错信息为:" + exp.getMessage()); } }); System.out.println("main end ..." ); } }
执行结果:
正常1 2 3 4 5 main start ... 开启异步任务 ... main end ... 异步结果已经执行完成 ... 异步执行结果为:5
异常1 2 3 4 5 main start ... 开启异步任务 ... main end ... 异步结果已经执行完成 ... 异步执行出错啦,出错信息为:java.lang.ArithmeticException: / by zero
对于异常的捕获也可以这样写:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CompletableFutureTest3 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 ..." ); return 10 / 0 ; }, service).whenCompleteAsync((res,exp)->{ System.out.println("异步结果已经执行完成 ..." ); System.out.println("异步执行结果为:" + res); }).exceptionally(throwable->{ System.out.println("捕获到异步执行的异常信息,出错信息为:" + throwable.getMessage()); int res = 5 ; System.out.println("返回默认异常结果:" + 5 ); return res; }); System.out.println("main end ..." ); } }
执行结果:1 2 3 4 5 6 7 main start ... 开启异步任务 ... 异步结果已经执行完成 ... 异步执行结果为:null 捕获到异步执行的异常信息,出错信息为:java.lang.ArithmeticException: / by zero 返回默认异常结果:5 main end ...
我们可以看到,通过exceptionally可以捕获异步任务抛出来的异常信息,并对异常进行处理,并可以将处理结果返回。 whenComplete虽然可以得到异常信息,但是无法修改结果,exceptionally可以感知异常,同时可以返回默认值。
三、handle最终处理 handle和whenComplete方法类似,但是whenComplete能感知异常但是不能返回结果。只能通过exceptionally进行处理。
而handle即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CompletableFutureTest3 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 ..." ); return 10 / 0 ; }, service).handleAsync((res, exp) -> { System.out.println("进入handleAsync方法 ..." ); if (res != null ) { return res * 2 ; } if (exp != null ) { System.out.println("捕获到异步执行的异常信息,出错信息为:" + exp.getMessage()); return 0 ; } return 0 ; }, service); System.out.println("获取异步任务返回值:" + future.get()); System.out.println("main end ..." ); } }
执行结果:
1 2 3 4 5 6 main start ... 开启异步任务 ... 进入handleAsync方法 ... 捕获到异步执行的异常信息,出错信息为:java.lang.ArithmeticException: / by zero 获取异步任务返回值:0 main end ...
如果我们去掉异常信息,可以看到如下返回值,最终异步执行结果为10;最终执行结果如下:
1 2 3 4 5 main start ... 开启异步任务 ... 进入handleAsync方法 ... 获取异步任务返回值:10 main end ...
四、线程串行化 在CompletableFuture中有以下方法:
1 2 3 4 5 6 7 8 9 public <U> CompletableFuture<U> thenApply (Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync (Function<? super T,? extends U> fn, Executor executor) public CompletableFuture<Void> thenAccept (Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action,Executor executor) public CompletableFuture<Void> thenRun (Runnable action) public CompletableFuture<Void> thenRunAsync (Runnable action) public CompletableFuture<Void> thenRunAsync (Runnable action,Executor executor)
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值 。 thenAccept方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果 。 thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。 thenRun 获取不到上个任务的执行结果,无返回值。 thenRun thenRun 不能获取上一步的执行结果,并无返回值。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class CompletableFutureTest3 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 ..." ); return 10 / 2 ; }, service).thenRun(()->{ System.out.println("任务2启动了..." ); }); System.out.println("获取异步任务返回值:" + future.get()); System.out.println("main end ..." ); } }
执行结果:
1 2 3 4 5 main start ... 开启异步任务 ... 任务2启动了... 获取异步任务返回值:null main end ...
如果我们需要获取上一步的执行结果,我们使用thenAccept;
thenAccept 消费处理结果,接收任务的处理结果,并消费处理,无返回结果 。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CompletableFutureTest4 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 ..." ); return 10 / 2 ; }, service).thenAcceptAsync((res)->{ System.out.println("任务2启动了... res:" + res); }); System.out.println("获取异步任务返回值:" + future.get()); System.out.println("main end ..." ); } }
执行结果:
1 2 3 4 5 main start ... 开启异步任务 ... 任务2启动了... res:5 获取异步任务返回值:null main end ...
如果我们即需要上一步执行结果,并需要返回值供别人使用,那么我们使用thenApply方法;
thenApply 当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值 。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class CompletableFutureTest5 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 ..." ); return 10 / 2 ; }, service).thenApplyAsync((res)->{ System.out.println("任务2启动了... res:" + res); return res; }); System.out.println("获取异步任务返回值:" + future.get()); System.out.println("main end ..." ); } }
执行结果:
1 2 3 4 5 main start ... 开启异步任务 ... 任务2启动了... res:5 获取异步任务返回值:5 main end ...
双任务-都执行(并) 1 2 3 4 5 6 7 8 9 public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) ;public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn,Executor executor) ;public <U> CompletableFuture<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) ;public <U> CompletableFuture<Void> thenAcceptBothAsync (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) ;public <U> CompletableFuture<Void> thenAcceptBothAsync (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) ;public CompletableFuture<Void> runAfterBoth (CompletionStage<?> other,Runnable action) };public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other,Runnable action) }public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other,Runnable action, Executor executor) }
两个任务必须都完成,触发该任务。
runAfterBoth 没有返回值,入参CompletionStage、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务) thenAcceptBoth 可以获取两个任务的返回值。 thenCombine 可以获取两个任务的返回值,并可以将任务三结果返回。 runAfterBoth 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class CompletableFutureTest6 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1 ..." ); return 10 / 2 ; }, service); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务2 ..." ); return "will" ; }, service); f1.runAfterBothAsync(f2, () -> { System.out.println("开启异步任务3 ..." ); }, service); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 main start ... 开启异步任务1 ... 开启异步任务2 ... main end ... 开启异步任务3 ...
可以看到,任务3是在任务1和任务2执行完成后,才执行的。
thenAcceptBoth 我们使用thenAcceptBoth可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值。我们看下案例。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class CompletableFutureTest7 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1 ..." ); return 10 / 2 ; }, service); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务2 ..." ); return "will" ; }, service); f1.thenAcceptBothAsync(f2, (f1Res,f2Res) -> { System.out.println("开启异步任务3 ...,f1Res: " + f1Res + " f2Res: " + f2Res); }, service); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 main start ... 开启异步任务1 ... 开启异步任务2 ... main end ... 开启异步任务3 ...,f1Res: 5 f2Res: will
我们可以看到,任务3在任务1和任务2执行后执行了,并获取了任务1和任务2的返回值。
thenCombineAsync 可以获取两个任务的返回值,并可以将任务三结果返回
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class CompletableFutureTest8 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1 ..." ); return 10 / 2 ; }, service); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务2 ..." ); return "will" ; }, service); CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (f1Res, f2Res) -> { System.out.println("开启异步任务3 ...,f1Res: " + f1Res + " f2Res: " + f2Res); return f1Res + "--->" + f2Res; }, service); System.out.println("f3.get() = " + f3.get()); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 6 main start ... 开启异步任务1 ... 开启异步任务2 ... 开启异步任务3 ...,f1Res: 5 f2Res: will f3.get() = 5--->will main end ...
双任务-任意(或) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public CompletableFuture<Void> runAfterEither (CompletionStage<?> other, Runnable action) ; public CompletableFuture<Void> runAfterEitherAsync (CompletionStage<?> other, Runnable action) ; public CompletableFuture<Void> runAfterEitherAsync (CompletionStage<?> other, Runnable action, Executor executor) ; public CompletableFuture<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action) ; public CompletableFuture<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action) ; public CompletableFuture<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) ; public <U> CompletableFuture<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn) ; public <U> CompletableFuture<U> applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn) ;
当两个任务中,任意一个future任务完成的时候,执行任务。
applyToEither 两个任务有一个任务执行完成,获取它的返回值,处理任务并有新的返回值。 acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。 runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。 runAfterEitherAsync 不感知结果,自己没有返回值。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class CompletableFutureTest9 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1 ..." ); return 10 / 2 ; }, service); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("开启异步任务2 ..." ); return "will" ; }, service); CompletableFuture<Void> f3 = f1.runAfterEitherAsync(f2,()->{ System.out.println("开启异步任务3 ..." ); }, service); System.out.println("f3.get() = " + f3.get()); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 6 main start ... 开启异步任务1 ... 开启异步任务3 ... f3.get() = null main end ... 开启异步任务2 ...
我们可以看到,任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。
acceptEitherAsync 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class CompletableFutureTest10 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1 ..." ); return 10 / 2 ; }, service); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("开启异步任务2 ..." ); return 6 ; }, service); f1.acceptEitherAsync(f2,(res)->{ System.out.println("开启异步任务3 ... res: " +res); }, service); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 main start ... 开启异步任务1 ... main end ... 开启异步任务3 ... res: 5 开启异步任务2 ...
可以看到,可以获取任务1的执行结果,但不返回执行结果。
applyToEither 可以感知结果,并返回执行结果。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class CompletableFutureTest11 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1 ..." ); return 10 / 2 ; }, service); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("开启异步任务2 ..." ); return 6 ; }, service); CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, (res) -> { System.out.println("开启异步任务3 ... res: " + res); return "will" ; }, service); System.out.println("f3.get() = " + f3.get()); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 6 main start ... 开启异步任务1 ... 开启异步任务3 ... res: 5 f3.get() = will main end ... 开启异步任务2 ...
多任务组合 1 2 public static CompletableFuture<Void> allOf (CompletableFuture<?>... cfs) ;public static CompletableFuture<Object> anyOf (CompletableFuture<?>... cfs) ;
allOf:等待所有任务完成 anyOf: 只要有一个任务完成 allOf 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class CompletableFutureTest12 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品图片 ..." ); return "图片地址" ; }, service); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品属性..." ); return "黑色 256G" ; }, service); CompletableFuture<String> f3 = f1.supplyAsync(() -> { System.out.println("查询商品品牌..." ); return "苹果手机" ; }, service); CompletableFuture<Void> future = CompletableFuture.allOf(f1, f2, f3); future.get(); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 main start ... 查询商品图片 ... 查询商品属性... 查询商品品牌... main end ...
注:如果不使用future.get()阻塞,若其中一个任务执行时间较长,则可能会丢失任务信息。
anyOf 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class CompletableFutureTest13 { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws Exception { System.out.println("main start ..." ); CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品图片 ..." ); return "图片地址" ; }, service); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品属性..." ); return "黑色 256G" ; }, service); CompletableFuture<String> f3 = f1.supplyAsync(() -> { System.out.println("查询商品品牌..." ); return "苹果手机" ; }, service); CompletableFuture<Object> future = CompletableFuture.anyOf(f1, f2, f3); System.out.println("future.get() = " + future.get()); System.out.println("main end ..." ); } }
执行结果为:
1 2 3 4 5 6 main start ... 查询商品图片 ... 查询商品属性... future.get() = 图片地址 main end ... 查询商品品牌...