前言
最近博猪在做一些风险测算相关的工作,大体业务流程就是业务人员通过Excel录入数据后,后台通过默认的风险因子计算后,批量入库,同时会把业务人员导入的测算数据的Excel上传,由于这块操作比较多,但是因为后台使用的ORM框架是MybatisPlus,里面提供的一些API还是挺便捷的,但是同步通过输入测验发现一个问题,下面就以一个案例进入一下今天的主题,细扒一下MyBatisPlus的批量插入操作。
rewriteBatchedStatements参数
MySQL的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL,另外这个选项对INSERT/UPDATE/DELETE都有效
添加rewriteBatchedStatements=true
这个参数后的执行速度比较:
1
| jdbc:mysql://数据库地址/数据库名?useUnicode=true&characterEncoding=UTF8&allowMultiQueries=true&rewriteBatchedStatements=true
|
测试环境介绍
测试代码在本地执行,数据库使用本地虚拟机。
宿主环境
- 物理机处理为AMD Ryzen 9 5900HX with Radeon Graphics 3.30 GHz 48G运行内存 WIN11(OS)
- MySQL为8.0.30,虚拟机(4G运存,20G内存)
- jdk版本为1.8
- 日志级别调整到info或者warn级别,减少日志打印性能的消耗
代码
表结构
1 2 3 4 5 6 7
| CREATE TABLE `user` ( `id` bigint NOT NULL COMMENT '主键ID', `name` varchar(30) DEFAULT NULL COMMENT '姓名', `age` int DEFAULT NULL COMMENT '年龄', `email` varchar(50) DEFAULT NULL COMMENT '邮箱', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
|
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = App.class) public class BatchDemo {
@Autowired private UserService userService;
@Test public void testSave() { StopWatch stopWatch = new StopWatch("测试批量保存"); int loopNum = 10; int batchNum = 10_000; for (int i = 1; i <= loopNum; i++) { stopWatch.start("用户保存" + i); ArrayList<User> users = new ArrayList<>(); for (int j = 1; j <= batchNum; j++) { User user = User.builder() .id(IdWorker.getId()) .name("test-") .age(16) .email("@qq.com") .build(); users.add(user); } userService.saveBatch(users, users.size()); stopWatch.stop(); System.out.println(stopWatch.prettyPrint(TimeUnit.MILLISECONDS)); } } }
|
测试
普通saveBatch批量插入
我们循环1万次,把每个实例对象装到集合(List)中,然后调用Mybatis-Plus的saveBatch方法,传入List集合,实现批量对象的插入,然后我们在方法开始结束的地方,计算当前函数执行时长。为了测试严谨性,我们重复执行十次,下面是执行的时长:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| StopWatch '测试批量保存': running time = 88867 ms --------------------------------------------- ms % Task name --------------------------------------------- 000011977 13% 用户保存1 000008739 10% 用户保存2 000008547 10% 用户保存3 000008636 10% 用户保存4 000008610 10% 用户保存5 000008490 10% 用户保存6 000008382 09% 用户保存7 000008287 09% 用户保存8 000008624 10% 用户保存9 000008570 10% 用户保存10
|
OK,我们清空表结构,把数据量调整到10万次,,同时调整一下保存操作为userService.saveBatch(users);
执行结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| StopWatch '测试批量保存': running time = 888607 ms --------------------------------------------- ms % Task name --------------------------------------------- 000093144 10% 用户保存1 000087531 10% 用户保存2 000085724 10% 用户保存3 000092762 10% 用户保存4 000085413 10% 用户保存5 000090576 10% 用户保存6 000087468 10% 用户保存7 000086718 10% 用户保存8 000089837 10% 用户保存9 000089432 10% 用户保存10
|
这性能差异立马上来了,然后我技术经理说让我在数据库连接上加上rewriteBatchedStatements=true
属性,批量新增方法变更为userService.saveBatch(users, users.size());
试一下。我们再次清空表试一下两个情况:
增加rewriteBatchedStatements=true
一万次:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| StopWatch '测试批量保存': running time = 6351 ms --------------------------------------------- ms % Task name --------------------------------------------- 000003298 52% 用户保存1 000000503 08% 用户保存2 000000338 05% 用户保存3 000000304 05% 用户保存4 000000402 06% 用户保存5 000000310 05% 用户保存6 000000312 05% 用户保存7 000000299 05% 用户保存8 000000273 04% 用户保存9 000000308 05% 用户保存10
|
10万次:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| StopWatch '测试批量保存': running time = 40913 ms --------------------------------------------- ms % Task name --------------------------------------------- 000009092 22% 用户保存1 000003494 09% 用户保存2 000003580 09% 用户保存3 000003645 09% 用户保存4 000002939 07% 用户保存5 000002871 07% 用户保存6 000004359 11% 用户保存7 000003043 07% 用户保存8 000002978 07% 用户保存9 000004907 12% 用户保存10
|
效果惊呆了吧!!!!芜湖,起飞。
原理
那么问题来了,是什么原因导致的性能差异这么大的呢?
我们去掉rewriteBatchedStatements=true
参数, 把日志级别调整到debug
级别来看看日志输出情况,当然我们也要把我们数量调小些,一方面是为了我们更好的查看运行情况,另一方面也是为了我们能方便进行断点调试我们的代码。
首先我们在我们保存的方法上打个断点,然后运行,先放行,我们查看日志输出情况,我们发现SQL是按照单个的SQL的形式进行批量执行的。
- 调用MyBatisPlus提供的ServiceImpl的批量保存方法
1 2 3 4 5 6 7 8 9 10 11 12 13
|
protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) { return SqlHelper.executeBatch(this.entityClass, this.log, list, batchSize, consumer); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) { Assert.isFalse(batchSize < 1, "batchSize must not be less than one"); return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> { int size = list.size(); int idxLimit = Math.min(batchSize, size); int i = 1; for (E element : list) { consumer.accept(sqlSession, element); if (i == idxLimit) { sqlSession.flushStatements(); idxLimit = Math.min(idxLimit + batchSize, size); } i++; } });
|
从这里我们可以看到,批量保存的方法变成了单个插入的方式,我想这里大概是为了方便兼容各种数据库差异化的插入的处理吧,把批量保存的数据组装成单个的插入语句,到达指定批次数量之后session清空重新组装,而后事务统一提交。
到这里用的基本上都是代理对象了,不太直观的看出调用逻辑了,我们debug到方法里面查看具体调用逻辑。然后直到执行刷新语句(flushStatements
)这里,发现使用的是BatchExecutor
,具体下执行方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Override public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException { try { List<BatchResult> results = new ArrayList<>(); if (isRollback) { return Collections.emptyList(); } for (int i = 0, n = statementList.size(); i < n; i++) { Statement stmt = statementList.get(i); applyTransactionTimeout(stmt); BatchResult batchResult = batchResultList.get(i); try { batchResult.setUpdateCounts(stmt.executeBatch()); MappedStatement ms = batchResult.getMappedStatement(); List<Object> parameterObjects = batchResult.getParameterObjects(); KeyGenerator keyGenerator = ms.getKeyGenerator(); if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) { Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator; jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects); } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141 for (Object parameter : parameterObjects) { keyGenerator.processAfter(this, ms, stmt, parameter); } } // Close statement to close cursor #1109 closeStatement(stmt); } catch (BatchUpdateException e) { StringBuilder message = new StringBuilder(); message.append(batchResult.getMappedStatement().getId()) .append(" (batch index #") .append(i + 1) .append(")") .append(" failed."); if (i > 0) { message.append(" ") .append(i) .append(" prior sub executor(s) completed successfully, but will be rolled back."); } throw new BatchExecutorException(message.toString(), e, results, batchResult); } results.add(batchResult); } return results; } finally { for (Statement stmt : statementList) { closeStatement(stmt); } currentSql = null; statementList.clear(); batchResultList.clear(); } }
|
batchResult.setUpdateCounts(stmt.executeBatch());
断点到这里后我们进入方法内部,到com.mysql.cj.jdbc.StatementImpl
里面执行:
1 2 3 4
| @Override public int[] executeBatch() throws SQLException { return Util.truncateAndConvertToInt(executeBatchInternal()); }
|
然后再到com.mysql.cj.jdbc.ClientPreparedStatement
里面执行一下逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Override protected long[] executeBatchInternal() throws SQLException { synchronized (checkClosed().getConnectionMutex()) {
if (this.connection.isReadOnly()) { throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT); }
if (this.query.getBatchedArgs() == null || this.query.getBatchedArgs().size() == 0) { return new long[0]; }
int batchTimeout = getTimeoutInMillis(); setTimeoutInMillis(0);
resetCancelledState();
try { statementBegins();
clearWarnings();
if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
if (getQueryInfo().isRewritableWithMultiValuesClause()) { return executeBatchWithMultiValuesClause(batchTimeout); }
if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null && this.query.getBatchedArgs().size() > 3 ) { return executePreparedBatchAsMultiStatement(batchTimeout); } }
return executeBatchSerially(batchTimeout); } finally { this.query.getStatementExecuting().set(false);
clearBatch(); } } }
|
发现最终执行方法:
1
| return executeBatchSerially(batchTimeout);
|
首先查看方法文档,文档说明如下:
Executes the current batch of statements by executing them one-by-one.
executeBatchSerially核心源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| for (batchCommandIndex = 0; batchCommandIndex < nbrCommands; batchCommandIndex++) {
((PreparedQuery<?>) this.query).setBatchCommandIndex(batchCommandIndex);
Object arg = this.query.getBatchedArgs().get(batchCommandIndex);
try { if (arg instanceof String) { updateCounts[batchCommandIndex] = executeUpdateInternal((String) arg, true, this.retrieveGeneratedKeys);
getBatchedGeneratedKeys(this.results.getFirstCharOfQuery() == 'I' && containsOnDuplicateKeyInString((String) arg) ? 1 : 0); } else { QueryBindings<?> queryBindings = (QueryBindings<?>) arg; updateCounts[batchCommandIndex] = executeUpdateInternal(queryBindings, true);
getBatchedGeneratedKeys(containsOnDuplicateKeyUpdateInSQL() ? 1 : 0); } } catch (SQLException ex) { updateCounts[batchCommandIndex] = EXECUTE_FAILED;
if (this.continueBatchOnError && !(ex instanceof MySQLTimeoutException) && !(ex instanceof MySQLStatementCancelledException) && !hasDeadlockOrTimeoutRolledBackTx(ex)) { sqlEx = ex; } else { long[] newUpdateCounts = new long[batchCommandIndex]; System.arraycopy(updateCounts, 0, newUpdateCounts, 0, batchCommandIndex);
throw SQLError.createBatchUpdateException(ex, newUpdateCounts, this.exceptionInterceptor); } } }
|
通过代码分析,也确实是一条一条SQL执行,而不是把batch的SQL发送到服务器
但是
重点来了,执行executeBatchSerially是有条件的,再次贴一下源码:
1 2 3 4
| if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) { ... } return executeBatchSerially(batchTimeout);
|
也就是说,如果没做任何配置,默认情况下if条件是进不去的,会直接执行if块后边的语句的,那么if判断了什么,判断如下:
1
| !``this``.batchHasPlainStatements && ``this``.connection.getRewriteBatchedStatements()
|
batchHasPlainStatements
默认初始化就是false,可以不用管,重点是connection.getRewriteBatchedStatements()
。这个是Connection的一个参数rewriteBatchedStatements
,会在读取jdbcUrl
的时候读取进来:jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
我们将jdbcUrl添加一个**rewriteBatchedStatements
**试试,即变成:jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&***rewriteBatchedStatements=true
这时候就会进if块了。进入if块之后,再根据执行的是insert 还是update、 delete,会走不同方法
- 如果是insert语句,满成条件情况下,会整合成形如:”insert into xxx_table values (xx),(yy),(zz)…“这样的语句
- 如果是update\delete语句,满成条件情况下,会整合成形如:”update t set … where id = 1; update t set … where id = 2; update t set … where id = 3 …“这样的语句
然后分批次发送给MySQL(会有一次发送的package大小限制,所以需要拆分批次),拆分如下:
1
| int` `maxAllowedPacket = ``this``.connection.getMaxAllowedPacket();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) { if (canRewriteAsMultiValueInsertAtSqlLevel()) { return executeBatchedInserts(batchTimeout); } if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null && this.batchedArgs.size() > 3 ) { return executePreparedBatchAsMultiStatement(batchTimeout); } } return executeBatchSerially(batchTimeout);
|
总结
OK,知其然,知其所以然,我们大概知晓了参数的功能及意义,那么我们总结一下。
如果想要达到MySQL真正batchUpdate效果,需要有以下几个条件:
- 需要在jdbcUrl后添加参数rewriteBatchedStatements=true
- this.batchHasPlainStatements 为false
- 如果是update \ delete 语句,还需要mysql版本>=4.1.0,并且batch的数量>3
因此,如果可能的情况下,请在jdbcUrl后添加参数rewriteBatchedStatements=true,尽可能利用上MySQL给我们提供的便利,提高性能。
结尾ENDING:还是着重强调一下SQL方言为MySQL,这里只是针对MySQL的性能细究,请注意一下数据库!!!