批量处理类:使用Java8 函数操作

    1. //
    2. // Source code recreated from a .class file by IntelliJ IDEA
    3. // (powered by FernFlower decompiler)
    4. //
    5. import java.util.Collection;
    6. import java.util.function.Consumer;
    7. import java.util.stream.Collectors;
    8. import java.util.stream.Stream;
    9. public final class BatchJobUtils {
    10. public static int DEFAULT_MAX_BATCH_SIZE = 1000;
    11. public BatchJobUtils() {
    12. }
    13. public static int countStep(int size) {
    14. if (size < DEFAULT_MAX_BATCH_SIZE) {
    15. return 1;
    16. } else {
    17. int step = size / DEFAULT_MAX_BATCH_SIZE;
    18. return size % DEFAULT_MAX_BATCH_SIZE == 0 ? step : step + 1;
    19. }
    20. }
    21. public static int countStep(int size, int limit) {
    22. if (size < limit) {
    23. return 1;
    24. } else {
    25. int step = size / limit;
    26. return size % limit == 0 ? step : step + 1;
    27. }
    28. }
    29. public static void doBatch(int size, Consumer<Integer> consumer) {
    30. int step = countStep(size, DEFAULT_MAX_BATCH_SIZE);
    31. ((Stream)Stream.iterate(0, (n) -> {
    32. return n + 1;
    33. }).limit((long)step).parallel()).forEach(consumer);
    34. }
    35. public static <T> void doBatch(Collection<T> list, Consumer<Collection<T>> consumer) {
    36. int step = countStep(list.size(), DEFAULT_MAX_BATCH_SIZE);
    37. ((Stream)Stream.iterate(0, (n) -> {
    38. return n + 1;
    39. }).limit((long)step).parallel()).forEach((n) -> {
    40. consumer.accept(((Stream)list.stream().skip((long)n * (long)DEFAULT_MAX_BATCH_SIZE).limit((long)DEFAULT_MAX_BATCH_SIZE).parallel()).collect(Collectors.toList()));
    41. });
    42. }
    43. public static <T> void doBatch(Collection<T> list, int batchSize, Consumer<Collection<T>> consumer) {
    44. int step = countStep(list.size(), batchSize);
    45. ((Stream)Stream.iterate(0, (n) -> {
    46. return n + 1;
    47. }).limit((long)step).parallel()).forEach((n) -> {
    48. consumer.accept(((Stream)list.stream().skip((long)n * (long)batchSize).limit((long)batchSize).parallel()).collect(Collectors.toList()));
    49. });
    50. }
    51. }

    该工具类的使用

    1. // 查询名称重复的供应商
    2. CountDownLatch querySupplierCountDownLatch = new CountDownLatch(BatchJobUtils.countStep(supplierNameSet.size()));
    3. // doBatch 前一个是要消费的数据,后面是消费的具体逻辑,这里使用到了多线程去处理逻辑,组装了对象,十分的方便
    4. BatchJobUtils.doBatch(Arrays.asList(supplierNameSet.toArray()), subList -> {
    5. threadPoolTaskExecutor.execute(() -> {
    6. // 供应商查重
    7. List<JdiSupplierDto> supplierDtoList = this.jdiSupplierService.listBySupplierNames(subList, jdiImportExportOperateMsg.getShopId());
    8. if (CollectionUtils.isNotEmpty(supplierDtoList)) {
    9. for (JdiSupplierDto supplierDto : supplierDtoList) {
    10. supplierMap.put(supplierDto.getSupplierName(), supplierDto);
    11. }
    12. }
    13. querySupplierCountDownLatch.countDown();
    14. });
    15. });
    16. try {
    17. querySupplierCountDownLatch.await(30, TimeUnit.SECONDS);
    18. } catch (InterruptedException e) {
    19. throw new BasicException("importAddOrModifyJdiSupplierArray.findJdiSupplierBySupplierNames", ResultEnum.OUTER_SERVICE_EXCEPTION.getMessage(), e);
    20. }