1. 批量新增

1. MyBiats标签“foreach”

“foreach”的主要用在构建“in”条件中,它可以在SQL语句中进行迭代一个集合。“foreach”元素的属性主要有:“item”、“index”、“collection”、“open”、“separator”、“close”。“item”表示集合中每一个元素进行迭代时的别名,“index”指 定一个名字,用于表示在迭代过程中,每次迭代到的位置,“open”表示该语句以什么开始,“separator”表示在每次进行迭代之间以什么符号作为分隔 符,“close”表示以什么结束。
以下图例为批量新增100万行记录,单行和多行方式比较情况。
MyBatis批量操作 - 图1
在使用“foreach”的时候最关键的也是最容易出错的就是“collection”属性,该属性是必须指定的,但是在不同情况 下,该属性的值是不一样的,主要有一下3种情况:

  1. 如果传入的是单参数且参数类型是一个List的时候,collection属性值为list。
  2. 如果传入的是单参数且参数类型是一个array数组的时候,collection的属性值为array。
  3. 如果传入的参数是多个的时候,我们就需要把它们封装成一个Map了。

具体用法如下(MyBatis示例):

  1. <!-- Oracle示例(字段固定不变) -->
  2. <insert id="insertBatch" parameterType="List">
  3. INSERT INTO TEST(c1,c2)
  4. <foreach collection="list" item="item" index="index" open="("close=")"separator="union all">
  5. SELECT #{item.c1} as a, #{item.c2} as b FROM DUAL
  6. </foreach>
  7. </insert>
  8. <!-- MySQL、DB2示例1(字段固定不变) -->
  9. <insert id="insertBatch" >
  10. insert into TEST(c1,c2)
  11. values
  12. <foreach collection="list" item="item" index="index" separator=",">
  13. (#{item.c1},#{item.c2})
  14. </foreach>
  15. </insert>
  16. <!-- MySQL、DB2示例2(字段动态变化) -->
  17. <insert id="insertBatch">
  18. insert into TEST
  19. <foreach collection="fields" item="field" open="(" close=")" separator=",">
  20. <choose>
  21. <when test="field == 'C1'">
  22. C1
  23. </when>
  24. <when test="field == 'C2'">
  25. C2
  26. </when>
  27. </choose>
  28. </foreach>
  29. values
  30. <foreach collection ="values" item="val" index= "index" separator =",">
  31. <trim prefix="(" suffix=")" suffixOverrides=",">
  32. <if test=" val.c1 != null ">
  33. #{val.c1},
  34. </if>
  35. <if test=" val.c2 != null ">
  36. #{val.c2},
  37. </if>
  38. </trim>
  39. </foreach >
  40. </insert>

2. MyBatis SQL执行器ExecutorType.BATCH

MyBatis内置的ExecutorType有3种,默认的是“simple”,该模式下它为每个语句的执行创建一个新的预处理语句,单条提交sql;而“batch”模式重复使用已经预处理的语句,并且批量执行所有更新语句,显然“batch”性能将更优; 但“batch”模式也有自己的问题,比如在“Insert”操作时,在事务没有提交之前,是没有办法获取到自增的“id”,这在某型情形下是不符合业务要求的。具体用法如下:

  1. xxxMapper方式。

    1. // 获取sqlsession,从spring注入原有的sqlSessionTemplate
    2. @Autowired
    3. private SqlSessionTemplate sqlSessionTemplate;
    4. // 新获取一个模式为BATCH,自动提交为false的session
    5. // 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
    6. SqlSession session = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH,false);
    7. // 通过新的session获取mapper
    8. FooMapper fooMapper = session.getMapper(FooMapper.class);
    9. int size = 10000;
    10. try{
    11. for(int i = 0; i < size; i++) {
    12. Foo foo = new Foo();
    13. foo.setC1(String.valueOf(System.currentTimeMillis()));
    14. foo.setC2("c2-value");
    15. fooMapper.insert(foo);
    16. if(i % 1000 == 0 || i == size - 1) {
    17. // 手动每1000个一提交,提交后无法回滚
    18. session.commit();
    19. // 清理缓存,防止溢出
    20. session.clearCache();
    21. }
    22. }
    23. } catch (Exception e) {
    24. // 没有提交的数据可以回滚
    25. session.rollback();
    26. } finally{
    27. session.close();
    28. }
  2. 命名空间方式。

    1. public void insertBatch(Map<String,Object> paramMap, List<Foo> list) throws Exception {
    2. // 新获取一个模式为BATCH,自动提交为false的session
    3. // 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
    4. SqlSession session = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH, false);
    5. try {
    6. if(null != list || list.size()>0){
    7. int lsize=list.size();
    8. for (int i = 0, n=list.size(); i < n; i++) {
    9. Foo _entity= list.get(i);
    10. _entity.setC1(String.valueOf(System.currentTimeMillis()));
    11. _entity.setC2("c2-value"String.valueOf(System.currentTimeMillis()));
    12. //session.insert("org.polaris.mapper.FooMapper.insertList",_entity);
    13. //session.update("org.polaris.mapper.FooMapper.updateBatchByPrimaryKeySelective",_entity);
    14. session.insert("${包类名}.${操作名}", _entity);
    15. if ((i>0 && i % 1000 == 0) || i == lsize - 1) {
    16. // 手动每1000个一提交,提交后无法回滚
    17. session.commit();
    18. // 清理缓存,防止溢出
    19. session.clearCache();
    20. }
    21. }
    22. }
    23. } catch (Exception e) {
    24. // 没有提交的数据可以回滚
    25. session.rollback();
    26. } finally {
    27. session.close();
    28. }
    29. }

    2. 批量删除

    MyBatis示例:

    1. <delete id="deleteXXX" parameterType="java.util.List">
    2. DELETE FROM TEST
    3. WHERE id IN
    4. <foreach collection="ids" item="id" open="(" close=")" separator=",">
    5. #{id}
    6. </foreach>
    7. </delete>

    3. 批量更新

    1. 通过参数${list}组装SQL

    1. <update id="updateBatch" parameterType="java.util.List" >
    2. <foreach collection="list" item="item" index="index" open="" close="" separator=";">
    3. update TEST
    4. <set >
    5. <if test="item.c1 != null" >
    6. c1 = #{item.c1,jdbcType=VARCHAR},
    7. </if>
    8. <if test="item.c2 != null" >
    9. c2 = #{item.c2,jdbcType=VARCHAR},
    10. </if>
    11. </set>
    12. where id = #{item.id,jdbcType=BIGINT}
    13. </foreach>
    14. </update>

    2. 通过“case when”语句变相批量更新

    1. <update id="updateBatch" parameterType="java.util.List" >
    2. update TEST
    3. <trim prefix="set" suffixOverrides=",">
    4. <trim prefix="c1 =case" suffix="end,">
    5. <foreach collection="list" item="item" index="index">
    6. <if test="item.c1!=null">
    7. when id=#{item.id} then #{item.c1}
    8. </if>
    9. </foreach>
    10. </trim>
    11. <trim prefix="c2 =case" suffix="end,">
    12. <foreach collection="list" item="item" index="index">
    13. <if test="item.c2!=null">
    14. when id=#{item.id} then #{item.c2}
    15. </if>
    16. </foreach>
    17. </trim>
    18. </trim>
    19. where
    20. <foreach collection="list" separator="or" item="item" index="index" >
    21. id=#{item.id}
    22. </foreach>
    23. </update>

    3. 通过“DUPLICATE KEY UPDATE”/“MERGE INTO”批量更新

    1. <insert id="updateBatch" parameterType="java.util.List">
    2. insert into TEST( id,c1, c2 )
    3. VALUES
    4. <foreach collection="list" item="item" index="index" separator=",">
    5. (
    6. #{item.id,jdbcType=BIGINT},#{item.c1,jdbcType=VARCHAR}, #{item.c2,jdbcType=VARCHAR}
    7. )
    8. </foreach>
    9. ON DUPLICATE KEY UPDATE
    10. c1 = VALUES(c1),c2 = VALUES(c2)
    11. </insert>

    4. 批量合并汇入/插入更新

    1. Oracle(MERGE INTO)

    SQL语法:

    1. MERGE INTO table_name alias1
    2. USING (table|view|sub_query) alias2
    3. ON (join condition)
    4. WHEN MATCHED THEN
    5. UPDATE table_name
    6. SET col1 = col_val1,
    7. col2 = col_val2
    8. WHEN NOT MATCHED THEN
    9. INSERT (column_list) VALUES (column_values)
    10. ;

    MyBatis示例:

    1. <update id="mergeinfo">
    2. merge into TEST a
    3. using ( select #{id} as id, #{c1} as c1, #{c2} as c2 from dual ) b
    4. on (a.id = b.id)
    5. when not matched then
    6. insert (c1,c2) values(#{c1},#{c2})
    7. when matched then
    8. update set c1 = #{c1}
    9. where c2 = #{c2}
    10. </update>

    2. DB2(MERGE INTO)

    SQL示例:

    1. # 支持多行记录操作
    2. MERGE INTO BASE.TEST AS t1
    3. USING TABLE (VALUES(?,?,?)) t2(id,c1,c2)
    4. ON t1.id=t2.id
    5. when matched then
    6. update set t1.c1=t2.c1
    7. when not matched then
    8. insert (id,c1,c2) values(t2.id,t2.c1,t2.c2)
    9. ;

    3. MySQL(duplicate key update)

    SQL示例:

    1. # 支持多行记录操作
    2. -- 全部字段
    3. insert into TEST(id,c1,c2 ) select * from TMP on duplicate key update c1=values(c1),c2=values(c2);
    4. -- 部分字段
    5. insert into TEST(id,c1) select id,c1 from TMP on duplicate key update c1=values(c1);

    5. 工具类

    ```java package org.polaris.dev.Executor;

import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionTemplate; import org.polaris.dev.mapper.TgkhMapper; import org.polaris.dev.util.DataUtil; import org.polaris.dev.vo.Tgkh; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.function.BiConsumer; import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;

/**

  • @author polaris 450733605@qq.com
  • @Description 启动mybatis的Batch模式的批量新增、更新
  • @Date 2021-4-22 10:09
  • @Version 1.0.0 */ @Component public class MybatisBatchExecutor {

    /**

    • 每多少条记录拼接一次SQL
    • 拼接SQL的最大数据量(即:每次accept时,容忍的list数据条数,如:foreach,接收list.size条记录,拼接为单条SQL)
    • insert:支持把${batchCountToSQL}条数的数据使用foreach拼写成一个大SQL
    • update:支持把${batchCountToSQL}条数的数据拼写成case when方式的大SQL
    • merge:支持把${batchCountToSQL}条数的数据使用merge into语法或ON DUPLICATE KEY UPDATE语法拼写成一个大SQL
    • select:支持把${batchCountToSQL}条数的数据使用union all拼写成一个大SQL
      */ private static final int batchCountToSQL = 25;

      /**

    • 每多少次批量执行一次提交 */ private static final int batchCountToSubmit = 100;

      @Resource private SqlSessionTemplate sqlSessionTemplate;

      /**

    • 批量更新、新增
    • @param dbList
    • @param mapperClass
    • @param func
    • @param
    • @param */ public void insertOrUpdateBatch(List dbList, Class mapperClass, BiConsumer> func) { int batchLastIndex = batchCountToSQL; int batchLastIndexToSubmit = 0; int total = dbList.size();

      SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory(); SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); M modelMapper = sqlSession.getMapper(mapperClass); try {

      1. if (total > batchCountToSQL) {
      2. // 多个批次情况
      3. for (int index = 0; index < total; ) {
      4. if (batchLastIndex > total) {
      5. // 最后一个批次,处理完成,结束循环体
      6. List<T> list = dbList.subList(index, total);
      7. func.accept(modelMapper, list);
      8. sqlSession.flushStatements();
      9. sqlSession.commit();
      10. break;
      11. } else {
      12. // 攒批,达到${batchCountToSubmit}次批量以后,提交
      13. List<T> list = dbList.subList(index, batchLastIndex);
      14. func.accept(modelMapper, list);
      15. if (batchLastIndexToSubmit++ >= batchCountToSubmit) {
      16. // 如果可以批量提交,则提交
      17. sqlSession.flushStatements();
      18. sqlSession.commit();
      19. batchLastIndexToSubmit = 0;
      20. }
      21. index = batchLastIndex; // 设置下一批下标
      22. batchLastIndex = index + (batchCountToSQL - 1);
      23. }
      24. }
      25. } else {
      26. // 不足一个批量时,直接提交
      27. func.accept(modelMapper, dbList);
      28. sqlSession.commit();
      29. }

      } finally {

      1. closeSqlSession(sqlSession, sqlSessionFactory);

      } }

      public static void main(String[] args) { MybatisBatchExecutor mybatisBatchExecutor = new MybatisBatchExecutor(); // 使用过程list赋值为实际数据 List list=null;

      // 批量插入 mybatisBatchExecutor.insertOrUpdateBatch(list, TgkhMapper.class, (mapper, data)-> {

      1. List<String> fields = DataUtil.getFieldsByColumnsName(data);
      2. mapper.batchInsertSample(data,fields);

      });

      // 批量Meger mybatisBatchExecutor.insertOrUpdateBatch(list, TgkhMapper.class, (mapper, data)-> {

      1. List<String> fields = DataUtil.getFieldsByColumnsName(data);
      2. mapper.batchMergeSample(data,fields);

      });

      // 批量查询 List bigList=new ArrayList<>(); mybatisBatchExecutor.insertOrUpdateBatch(list, TgkhMapper.class, (mapper, data)-> {

      1. List<Tgkh> primaryKeyList = mapper.findByPrimaryKey(data);
      2. bigList.addAll(primaryKeyList);

      }); } }

      1. <a name="cCfvG"></a>
      2. # 6. 注意事项
      3. <a name="m0ovW"></a>
      4. ## 1. 开启批量操作支持参数
      5. MySQL的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。MySQL JDBC驱动在默认情况下会无视“executeBatch()”语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。另外这个选项对“INSERT/UPDATE/DELETE”都有效。

      spring.datasource.url=jdbc:mysql://127.0.0.1:3306/testdb?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&rewriteBatchedStatements=true

      1. 添加这个参数后的执行速度比较(同一个表插入一万条数据时间近似值):JDBC BATCH 1.1秒左右 > Mybatis BATCH 2.2秒左右 > 拼接SQL 4.5秒左右。
      2. <a name="l5M1x"></a>
      3. ## 2. “foreach”使用注意点
      4. foreach后有多个values5000+)时PreparedStatement随之增长,包含了很多占位符,对于占位符和参数的映射尤其耗时。查阅相关[资料](https://www.red-gate.com/simple-talk/sql/performance/comparing-multiple-rows-insert-vs-single-row-insert-with-three-data-load-methods/)可知,values的增长与所需的解析时间,是呈指数型增长的。<br />![](https://cdn.nlark.com/yuque/0/2021/png/788484/1619055038554-5ad12a57-acf6-4c5c-9c19-1ef181670248.png#clientId=u134ed5c8-a4de-4&from=paste&height=445&id=ude7215fb&originHeight=445&originWidth=620&originalType=url&status=done&style=none&taskId=u6d715235-2312-49a0-9b6d-5620a30fa2b&width=620)<br />如果非要使用“foreach”的方式来进行批量插入的话,可以考虑减少一条“insert”语句中“values”的个数,最好能达到上面曲线的最底部的值,使速度最快。一般按经验来说,一次性插20~50行数量是比较合适的,时间消耗也能接受。
      5. <a name="oJjig"></a>
      6. ## 3. 大SQL内容过大异常
      7. MySQL-5.x数据,插入或更新字段有大数据时(大于1M),会出现如下错误:“**Packet for query is too large (1117260 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.**”<br />问题分析:mysql默认加载的数据文件不超过1M,可以通过更改mysql的配置文件my.cnf(Linux,或windows的my.ini)来更改这一默认值,从而达到插入大数据的目的。<br />解决方案:调整MySQL系统参数“max_allowed_packet”。
      8. ```sql
      9. show VARIABLES like '%max_allowed_packet%';
      10. SET GLOBAL max_allowed_packet=16*1024*1024;
      11. set @@max_allowed_packet=16*1024*1024;

      参考

      简书:批处理“rewriteBatchedStatements=true”
      https://www.jianshu.com/p/0f4b7bc4d22c
      CSDN:MyBatis批量插入几千条数据慎用foreach
      https://blog.csdn.net/huanghanqian/article/details/83177178
      https://www.red-gate.com/simple-talk/sql/performance/comparing-multiple-rows-insert-vs-single-row-insert-with-three-data-load-methods/