异步编程(2)-Java异步编程指北

前言

本文简要介绍一下Java中可支持异步编程的类和使用方法,方便后期回顾和汇总。

同步异步优缺点

在我们平时开发中或多或少都会遇到需要调用接口来完成一个功能的需求,这个接口可以是内部系统也可以是外部的,然后等到接口返回数据了才能继续其他的业务流程,这就是传统的 同步模式。

同步模式虽然简单但缺点也很明显,如果对方服务处理缓慢迟迟未能返回数据,或网络问题导致响应变长,就会阻塞我们调用方的线程,导致我们主流程的耗时的延迟,传统的解决方式是增加接口、网关的超时(timeout)设置,防止无限期等待。但即使这样还是会占用CPU资源。

在我们做rpc远程调用,redis,数据库访问等比较耗时的网络请求时经常要面对这样的问题,这种业务场景我们可以引入异步的编程思想,即主流程不需要阻塞等待接口返回数据,而是继续往下执行,当真正需要这个接口返回结果时再通过回调或阻塞的方式获取,此时我们的主流程和异步任务是并行执行的。

Java中实现异步主要是通过Future,CompletableFuture,Guava ListenableFuture以及一些异步响应式框架如RxJava实现。

下面我们主要看下这几种组件适用的业务场景和需要注意的地方,避免踩坑。

Future

Future 表示异步计算的结果。提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。只有在计算完成后才能使用方法 get 检索结果,必要时阻止,直到准备就绪。取消是通过该方法 cancel 执行的。提供了其他方法来确定任务是正常完成还是已取消。计算完成后,无法取消计算。如果您想为了可取消性而使用Future ,但不提供可用的结果,则可以声明Future<?> 的类型并作为基础任务的结果返回 null。
接口信息如下图所示:

实际开发中我们一般会结合线程池的submit配合使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* AsyncTest 异步
* @author will
*/public class FutureTest {

public static void main(String[] args) throws Exception{
//创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 模拟接口调用
Future<String> submit = executor.submit(() -> {
Thread.sleep(2*1000);
return "hello world";
});

System.out.println("异步执行结果 = " + submit.get());
System.out.println("submit.isDone() = " + submit.isDone());

}
}

简单的说我有一个任务,提交给了Future,Future替我完成这个任务,这期间我可以去做别的事情。一段时间之后,我再从Future取出结果。

上面的代码有2个地方需要注意:

  • 不建议使用future.get()方式,而应该使用future.get(long timeout, TimeUnit unit)
    尤其是在生产环境一定要设置合理的超时时间,防止程序无限期等待下去
  • 另外就是要考虑异步任务执行过程中报错抛出异常的情况,需要捕获future的异常信息。
    通过代码可以看出一些简单的异步场景可以使用Future解决,但是对于结果的获取却不是很方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式相当于把异步变成了同步,显然和异步编程的初衷相违背,轮询的方式又会浪费CPU资源。

Future没有提供通知的机制,就是回调,我们无法知道它什么时间完成任务。

而且在复杂一点的情况下,比如多个异步任务的场景,一个异步任务依赖上一个异步任务的执行结果,异步任务合并等,Future无法满足需求。

ListenableFuture

Google并发包下的listenableFuture对Java原生的future做了扩展,顾名思义就是使用监听器模式实现的回调,所以叫可监听的future。

要使用listenableFuture还要结合MoreExecutor线程池,MoreExecutor是对Java原生线程池的封装,比如常用的MoreExecutors.listeningDecorator(threadPool); 修改Java原生线程池的submit方法,封装了future返回listenableFuture。
接口信息如下图所示:

代码示例如下:
首先还是创建线程池,模拟业务调用

1
2
3
4
5
6
7
8
//创建线程池  
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
// 模拟接口调用
ListenableFuture<String> future = executor.submit(() -> {
Thread.sleep(2*1000);
// int i = 1 / 0;
return "hello world";
});

上面的代码是构造了一个ListenableFuture的异步任务,调用它的结果一般有两种方式:

  • 基于监听器(addListener)
    1
    2
    3
    4
    5
    6
    7
    8
    // 基于监听机制  
    future.addListener(() -> {
    try {
    System.out.println("异步结果:" + future.get());
    } catch (Exception e) {
    e.printStackTrace();
    }
    }, executor);
  • 基于回调(addCallback)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 基于回调机制  
    Futures.addCallback(future, new FutureCallback<String>() {
    @Override
    public void onSuccess(@Nullable String result) {
    System.out.println("异步结果:" + result);
    }
    @Override
    public void onFailure(Throwable t) {
    System.out.println("异步结果错误:" + t.getMessage());
    }
    });
    上述代码用例可以发现我注释掉了一个int i = 1 / 0,在学习过程中可以放开来看看结果,对比验证一下。
    OK,上述如果验证完毕后,可以比对看下面的汇总,加深一下使用区别。
方法使用说明
addListener需要自己代码里捕获处理异常情况,最好设置超时时间
addCallback把正常返回和异常情况做了分离,方便我们针对不同情况做处理

另外Futures里还有很多其他的api,可以满足我们负责场景,比如transform()可以处理异步任务之间的依赖情况,allAsList()将多个ListenableFuture合并成一个。

CompletableFuture

如果你们公司的jdk是8或以上的版本,那可以直接使用CompletableFuture类来实现异步编程。

Java8新增的CompletableFuture类借鉴了Google Guava的ListenableFuture,它包含50多个方法,默认使用forkJoinPool线程池,提供了非常强大的Future扩展功能,可以帮助我们简化异步编程的复杂性,结合函数式编程,通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的多种方法,可以满足大部分异步回调场景。
接口信息如下图所示:

虽然方法很多但有个特征:

  • 以Async结尾的方法签名表示是在异步线程里执行,没有以Async结尾的方法则是由主线程调用
  • 如果参数里有Runnable类型,则没有返回结果,即纯消费的方法
  • 如果参数里没有指定executor则默认使用forkJoinPool线程池,指定了则以指定的线程池来执行任务

thenApplyAsync

示例代码

==这里先说明一下,示例代码只关注核心功能,如果要实际使用需要考虑超时和异常情况,大家需要注意。==

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CompletableFuture<String> f1 = CompletableFuture  
.supplyAsync(() -> {
System.out.println("...开始执行 hello");
String hello = "hello! ";
int i = 1/0;
System.out.println("...执行hello完毕,执行结果:" + hello);
return hello;
});
CompletableFuture<String> f2 = f1.thenApplyAsync(s -> {
System.out.println("...开始执行 world");
String world = "world!";
System.out.println("...获取上一步结果,结果为:" + s);
System.out.println("开始输出 " + world);
return s + world;
});
System.out.println("f2.get() = " + f2.get());

在上面的代码中异步任务f2需要异步任务f1的结果才能执行,但对于我们的主线程来说,无须等到f1返回结果后再调用函数f2,即不会阻塞主流程,而是告诉CompletableFuture当执行完了f1的方法再去执行f2,只有当需要最后的结果时再获取。

thenComposeAsync

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CompletableFuture<String> f1 = CompletableFuture  
.supplyAsync(() -> {
System.out.println("...开始执行 hello");
String hello = "hello! ";
int i = 1/0;
System.out.println("...执行hello完毕,执行结果:" + hello);
return hello;
});
CompletableFuture<String> f2 = f1.thenComposeAsync(t -> CompletableFuture.supplyAsync(()->{
System.out.println("...开始执行 world");
String world = "world!";
System.out.println("...获取上一步结果,结果为:" + t);
System.out.println("开始输出 " + world);
return t + world;
}));
System.out.println("f2.get() = " + f2.get());

上面两个案例的输出结果相同,但是两者还是有细微的差异的,差异信息如下方所示:

方法名称描述
thenComposeAsync将异步操作的结果保存到 CompletableFuture 中,并返回一个新的 CompletableFuture
thenApplyAsync将异步操作的结果应用于输入参数,并返回一个新的 CompletableFuture
通过代码注释能看出thenCompose相当于flatMap,避免CompletableFuture<CompletableFuture>这种写法。

这也是thenComposethenApply的区别,通过查看api也能看出:
thenApply:

1
2
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);
}

thenCompose:

1
2
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(screenExecutor(executor), fn);
}

thenCombineAsync

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
long time = System.currentTimeMillis();  
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
// 模拟接口耗时,1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "hello! ";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
// 模拟接口耗时,1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "world! ";
});
CompletableFuture<String> f3 = f1.thenCombineAsync(f2, (s1, s2) -> s1 + s2);
System.out.println("异步结果:" + f3.get());
System.out.println("耗时:" + (System.currentTimeMillis() - time));

从代码输出结果可以看到两个异步任务f1、f2是并行执行,彼此无先后依赖顺序,thenCombineAsync适合将两个并行执行的异步任务的结果合并返回成一个新的future。

还有一个类似的方法thenAcceptBoth也是合并两个future的结果,但是不会返回新的值,内部消费掉了。

applyToEitherAsync

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {  
// 模拟接口耗时,1s
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "hello! ";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
// 模拟接口耗时,1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "world! ";
});
CompletableFuture<String> f3 = f1.applyToEitherAsync(f2, t -> t);
System.out.println("异步结果:" + f3.get());

输出的结果:world,哪个future先执行完就根据它的结果计算,取两个future最先返回的。

这里要说明一点,如果两个future是同时返回结果,那么applyToEitherAsync永远以第一个future的结果为准,大家可以把上面代码的Thread.sleep注释掉测试下。

另外acceptEither方法和这个类似,但是没有返回值。

allOf / anyOf
前面讲的compose,combine,either都是处理两个future的方法,如果是超过2个的可以使用allOf或anyOf

allOf

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CompletableFuture<Void> f1 = CompletableFuture.allOf(  
CompletableFuture.supplyAsync(() -> {
try {
// 模拟接口调用耗时1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}), CompletableFuture.supplyAsync(() -> {
try {
// 模拟接口调用耗时1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), CompletableFuture.supplyAsync(() -> {
try {
// 模拟接口调用耗时1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "!!!!!!";
})
);
System.out.println("f1.get() = " + f1.get());

allOf方法是当所有的CompletableFuture都执行完后执行计算,无返回值

anyOf

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CompletableFuture<Object> f1 = CompletableFuture.anyOf(  
CompletableFuture.supplyAsync(() -> {
try {
// 模拟接口调用耗时1秒
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}), CompletableFuture.supplyAsync(() -> {
try {
// 模拟接口调用耗时1秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), CompletableFuture.supplyAsync(() -> {
try {
// 模拟接口调用耗时1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "!!!!!!";
})
);
System.out.println("f1.get() = " + f1.get());

多次执行,执行结果为”!!!!!!”,anyOf方法当任意一个CompletableFuture执行完后就会执行计算。

虽然说CompletableFuture更适合I/O场景,但使用时一定要结合具体业务,比如说有些公共方法处理异步任务时需要考虑异常情况,这时候使用CompletableFuture.handle(BiFunction<? super T, Throwable, ? extends U> fn)更合适,handle方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。

CompletableFuture还有一个坑需要注意:如果线上流量比较大的情况下会出现响应缓慢的问题。

因为CompletableFuture默认使用的线程池是forkJoinPool,当时对一台使用了CompletableFuture实现异步回调功能的接口做压测,通过监控系统发现有大量的ForkJoinPool.commonPool-worker-* 线程处于等待状态,进一步分析dump信息发现是forkJoinPool的makeCommonPool问题。
具体原因为:

java.util.concurrent.ForkJoinPool.common.parallelism的值,那么forkJoinPool线程池的线程数就是(cpu-1),如果我们机器是2核,这样实际执行任务的线程数只有1个,当有大量请求过来时,如果有耗时高的io操作,势必会造成更多的线程等待,进而拖累服务响应时间。

解决方案一个是设置java.util.concurrent.ForkJoinPool.common.parallelism这个值(要在项目启动时指定),或者指定线程池不使用默认的forkJoinPool。根据阿里巴巴规范建议使用自定义的线程池。

那么如何比较好的设置线程数量,以下参考《Java并发编程实战》中的公式:
$$threads = N CPU * U CPU * (1 + W/C)$$

  • N CPU 是处理器的核数
  • U CPU 是期望的CPU利用率(介于0和1之间)
  • W/C是等待时间与计算时间的比率

网上也有这么区分的:

  • 如果服务是cpu密集型的,设置为电脑的核数
  • 如果服务是io密集型的,设置为电脑的核数*2

其实并不严谨,尤其是io密集型的还要参考QPS和web服务器的配置。


异步编程(2)-Java异步编程指北
https://github.com/yangxiangnanwill/yangxiangnanwill.github.io/2024/01/03/好好码代码吖/JAVA/异步编程/异步编程(2)-Java异步编程指北/
作者
will
发布于
2024年1月3日
许可协议