JDBC连接参数【rewriteBatchedStatements】详解

前言

最近博猪在做一些风险测算相关的工作,大体业务流程就是业务人员通过Excel录入数据后,后台通过默认的风险因子计算后,批量入库,同时会把业务人员导入的测算数据的Excel上传,由于这块操作比较多,但是因为后台使用的ORM框架是MybatisPlus,里面提供的一些API还是挺便捷的,但是同步通过输入测验发现一个问题,下面就以一个案例进入一下今天的主题,细扒一下MyBatisPlus的批量插入操作。

rewriteBatchedStatements参数

MySQL的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL,另外这个选项对INSERT/UPDATE/DELETE都有效

添加rewriteBatchedStatements=true这个参数后的执行速度比较:

1
jdbc:mysql://数据库地址/数据库名?useUnicode=true&characterEncoding=UTF8&allowMultiQueries=true&rewriteBatchedStatements=true

测试环境介绍

测试代码在本地执行,数据库使用本地虚拟机。

宿主环境

  • 物理机处理为AMD Ryzen 9 5900HX with Radeon Graphics 3.30 GHz 48G运行内存 WIN11(OS)
  • MySQL为8.0.30,虚拟机(4G运存,20G内存)
  • jdk版本为1.8
  • 日志级别调整到info或者warn级别,减少日志打印性能的消耗

代码

表结构

1
2
3
4
5
6
7
CREATE TABLE `user` (
`id` bigint NOT NULL COMMENT '主键ID',
`name` varchar(30) DEFAULT NULL COMMENT '姓名',
`age` int DEFAULT NULL COMMENT '年龄',
`email` varchar(50) DEFAULT NULL COMMENT '邮箱',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* BatchDemo
* @author will
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = App.class)
public class BatchDemo {

@Autowired
private UserService userService;

@Test
public void testSave() {
StopWatch stopWatch = new StopWatch("测试批量保存");
int loopNum = 10;
int batchNum = 10_000;
for (int i = 1; i <= loopNum; i++) {
stopWatch.start("用户保存" + i);
ArrayList<User> users = new ArrayList<>();
for (int j = 1; j <= batchNum; j++) {
User user = User.builder()
.id(IdWorker.getId())
.name("test-")
.age(16)
.email("@qq.com")
.build();
users.add(user);
}
userService.saveBatch(users, users.size());
stopWatch.stop();
System.out.println(stopWatch.prettyPrint(TimeUnit.MILLISECONDS));
}
}
}

测试

普通saveBatch批量插入

我们循环1万次,把每个实例对象装到集合(List)中,然后调用Mybatis-Plus的saveBatch方法,传入List集合,实现批量对象的插入,然后我们在方法开始结束的地方,计算当前函数执行时长。为了测试严谨性,我们重复执行十次,下面是执行的时长:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
StopWatch '测试批量保存': running time = 88867 ms
---------------------------------------------
ms % Task name
---------------------------------------------
000011977 13% 用户保存1
000008739 10% 用户保存2
000008547 10% 用户保存3
000008636 10% 用户保存4
000008610 10% 用户保存5
000008490 10% 用户保存6
000008382 09% 用户保存7
000008287 09% 用户保存8
000008624 10% 用户保存9
000008570 10% 用户保存10

OK,我们清空表结构,把数据量调整到10万次,,同时调整一下保存操作为userService.saveBatch(users);执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
StopWatch '测试批量保存': running time = 888607 ms
---------------------------------------------
ms % Task name
---------------------------------------------
000093144 10% 用户保存1
000087531 10% 用户保存2
000085724 10% 用户保存3
000092762 10% 用户保存4
000085413 10% 用户保存5
000090576 10% 用户保存6
000087468 10% 用户保存7
000086718 10% 用户保存8
000089837 10% 用户保存9
000089432 10% 用户保存10

这性能差异立马上来了,然后我技术经理说让我在数据库连接上加上rewriteBatchedStatements=true属性,批量新增方法变更为userService.saveBatch(users, users.size());试一下。我们再次清空表试一下两个情况:

增加rewriteBatchedStatements=true

一万次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
StopWatch '测试批量保存': running time = 6351 ms
---------------------------------------------
ms % Task name
---------------------------------------------
000003298 52% 用户保存1
000000503 08% 用户保存2
000000338 05% 用户保存3
000000304 05% 用户保存4
000000402 06% 用户保存5
000000310 05% 用户保存6
000000312 05% 用户保存7
000000299 05% 用户保存8
000000273 04% 用户保存9
000000308 05% 用户保存10

10万次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
StopWatch '测试批量保存': running time = 40913 ms
---------------------------------------------
ms % Task name
---------------------------------------------
000009092 22% 用户保存1
000003494 09% 用户保存2
000003580 09% 用户保存3
000003645 09% 用户保存4
000002939 07% 用户保存5
000002871 07% 用户保存6
000004359 11% 用户保存7
000003043 07% 用户保存8
000002978 07% 用户保存9
000004907 12% 用户保存10

效果惊呆了吧!!!!芜湖,起飞。

原理

那么问题来了,是什么原因导致的性能差异这么大的呢?

我们去掉rewriteBatchedStatements=true参数, 把日志级别调整到debug级别来看看日志输出情况,当然我们也要把我们数量调小些,一方面是为了我们更好的查看运行情况,另一方面也是为了我们能方便进行断点调试我们的代码。

首先我们在我们保存的方法上打个断点,然后运行,先放行,我们查看日志输出情况,我们发现SQL是按照单个的SQL的形式进行批量执行的。

  • 调用MyBatisPlus提供的ServiceImpl的批量保存方法
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 执行批量操作
*
* @param list 数据集合
* @param batchSize 批量大小
* @param consumer 执行方法
* @param <E> 泛型
* @return 操作结果
* @since 3.3.1
*/
protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
return SqlHelper.executeBatch(this.entityClass, this.log, list, batchSize, consumer);
}
  • 调用SqlHelper的批量执行方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 执行批量操作
*
* @param entityClass 实体类
* @param log 日志对象
* @param list 数据集合
* @param batchSize 批次大小
* @param consumer consumer
* @param <E> T
* @return 操作结果
* @since 3.4.0
*/
public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
int size = list.size();
int idxLimit = Math.min(batchSize, size);
int i = 1;
for (E element : list) {
consumer.accept(sqlSession, element);
if (i == idxLimit) {
sqlSession.flushStatements();
idxLimit = Math.min(idxLimit + batchSize, size);
}
i++;
}
});

从这里我们可以看到,批量保存的方法变成了单个插入的方式,我想这里大概是为了方便兼容各种数据库差异化的插入的处理吧,把批量保存的数据组装成单个的插入语句,到达指定批次数量之后session清空重新组装,而后事务统一提交。

到这里用的基本上都是代理对象了,不太直观的看出调用逻辑了,我们debug到方法里面查看具体调用逻辑。然后直到执行刷新语句(flushStatements)这里,发现使用的是BatchExecutor,具体下执行方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<>();
if (isRollback) {
return Collections.emptyList();
}
for (int i = 0, n = statementList.size(); i < n; i++) {
Statement stmt = statementList.get(i);
applyTransactionTimeout(stmt);
BatchResult batchResult = batchResultList.get(i);
try {
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
// Close statement to close cursor #1109
closeStatement(stmt);
} catch (BatchUpdateException e) {
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId())
.append(" (batch index #")
.append(i + 1)
.append(")")
.append(" failed.");
if (i > 0) {
message.append(" ")
.append(i)
.append(" prior sub executor(s) completed successfully, but will be rolled back.");
}
throw new BatchExecutorException(message.toString(), e, results, batchResult);
}
results.add(batchResult);
}
return results;
} finally {
for (Statement stmt : statementList) {
closeStatement(stmt);
}
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}

batchResult.setUpdateCounts(stmt.executeBatch());断点到这里后我们进入方法内部,到com.mysql.cj.jdbc.StatementImpl里面执行:

1
2
3
4
@Override
public int[] executeBatch() throws SQLException {
return Util.truncateAndConvertToInt(executeBatchInternal());
}

然后再到com.mysql.cj.jdbc.ClientPreparedStatement里面执行一下逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected long[] executeBatchInternal() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {

if (this.connection.isReadOnly()) {
throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT);
}

if (this.query.getBatchedArgs() == null || this.query.getBatchedArgs().size() == 0) {
return new long[0];
}

// we timeout the entire batch, not individual statements
int batchTimeout = getTimeoutInMillis();
setTimeoutInMillis(0);

resetCancelledState();

try {
statementBegins();

clearWarnings();

if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {

if (getQueryInfo().isRewritableWithMultiValuesClause()) {
return executeBatchWithMultiValuesClause(batchTimeout);
}

if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
&& this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) {
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}

return executeBatchSerially(batchTimeout);
} finally {
this.query.getStatementExecuting().set(false);

clearBatch();
}
}
}

发现最终执行方法:

1
return executeBatchSerially(batchTimeout);

首先查看方法文档,文档说明如下:

Executes the current batch of statements by executing them one-by-one.

executeBatchSerially核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
for (batchCommandIndex = 0; batchCommandIndex < nbrCommands; batchCommandIndex++) {

((PreparedQuery<?>) this.query).setBatchCommandIndex(batchCommandIndex);

Object arg = this.query.getBatchedArgs().get(batchCommandIndex);

try {
if (arg instanceof String) {
updateCounts[batchCommandIndex] = executeUpdateInternal((String) arg, true, this.retrieveGeneratedKeys);

// limit one generated key per OnDuplicateKey statement
getBatchedGeneratedKeys(this.results.getFirstCharOfQuery() == 'I' && containsOnDuplicateKeyInString((String) arg) ? 1 : 0);
} else {
QueryBindings<?> queryBindings = (QueryBindings<?>) arg;
updateCounts[batchCommandIndex] = executeUpdateInternal(queryBindings, true);

// limit one generated key per OnDuplicateKey statement
getBatchedGeneratedKeys(containsOnDuplicateKeyUpdateInSQL() ? 1 : 0);
}
} catch (SQLException ex) {
updateCounts[batchCommandIndex] = EXECUTE_FAILED;

if (this.continueBatchOnError && !(ex instanceof MySQLTimeoutException) && !(ex instanceof MySQLStatementCancelledException)
&& !hasDeadlockOrTimeoutRolledBackTx(ex)) {
sqlEx = ex;
} else {
long[] newUpdateCounts = new long[batchCommandIndex];
System.arraycopy(updateCounts, 0, newUpdateCounts, 0, batchCommandIndex);

throw SQLError.createBatchUpdateException(ex, newUpdateCounts, this.exceptionInterceptor);
}
}
}

通过代码分析,也确实是一条一条SQL执行,而不是把batch的SQL发送到服务器

但是

重点来了,执行executeBatchSerially是有条件的,再次贴一下源码:

1
2
3
4
if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {
...
}
return executeBatchSerially(batchTimeout);

也就是说,如果没做任何配置,默认情况下if条件是进不去的,会直接执行if块后边的语句的,那么if判断了什么,判断如下:

1
!``this``.batchHasPlainStatements && ``this``.connection.getRewriteBatchedStatements()

batchHasPlainStatements默认初始化就是false,可以不用管,重点是connection.getRewriteBatchedStatements()。这个是Connection的一个参数rewriteBatchedStatements,会在读取jdbcUrl的时候读取进来:jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true

我们将jdbcUrl添加一个**rewriteBatchedStatements**试试,即变成:jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&***rewriteBatchedStatements=true

这时候就会进if块了。进入if块之后,再根据执行的是insert 还是update、 delete,会走不同方法

  • 如果是insert语句,满成条件情况下,会整合成形如:”insert into xxx_table values (xx),(yy),(zz)…“这样的语句
  • 如果是update\delete语句,满成条件情况下,会整合成形如:”update t set … where id = 1; update t set … where id = 2; update t set … where id = 3 …“这样的语句

然后分批次发送给MySQL(会有一次发送的package大小限制,所以需要拆分批次),拆分如下:

1
int` `maxAllowedPacket = ``this``.connection.getMaxAllowedPacket();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {
//insert会在这里进行再次判断
if (canRewriteAsMultiValueInsertAtSqlLevel()) {
return executeBatchedInserts(batchTimeout);
}
//update、delete会在这里进行再次判断
//1. mysql版本>=4.1.0
//2. batchHasPlainStatements为false
//3. batch的数量>3
if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null
&& this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) {
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}

return executeBatchSerially(batchTimeout);

总结

OK,知其然,知其所以然,我们大概知晓了参数的功能及意义,那么我们总结一下。

如果想要达到MySQL真正batchUpdate效果,需要有以下几个条件:

  • 需要在jdbcUrl后添加参数rewriteBatchedStatements=true
  • this.batchHasPlainStatements 为false
  • 如果是update \ delete 语句,还需要mysql版本>=4.1.0,并且batch的数量>3

因此,如果可能的情况下,请在jdbcUrl后添加参数rewriteBatchedStatements=true,尽可能利用上MySQL给我们提供的便利,提高性能。

结尾ENDING:还是着重强调一下SQL方言为MySQL,这里只是针对MySQL的性能细究,请注意一下数据库!!!


JDBC连接参数【rewriteBatchedStatements】详解
https://github.com/yangxiangnanwill/yangxiangnanwill.github.io/2024/01/03/好好码代码吖/JAVA/MyBatis/JDBC连接参数【rewriteBatchedStatements】详解/
作者
will
发布于
2024年1月3日
许可协议