批量处理类:使用Java8 函数操作
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
import java.util.Collection;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public final class BatchJobUtils {
public static int DEFAULT_MAX_BATCH_SIZE = 1000;
public BatchJobUtils() {
}
public static int countStep(int size) {
if (size < DEFAULT_MAX_BATCH_SIZE) {
return 1;
} else {
int step = size / DEFAULT_MAX_BATCH_SIZE;
return size % DEFAULT_MAX_BATCH_SIZE == 0 ? step : step + 1;
}
}
public static int countStep(int size, int limit) {
if (size < limit) {
return 1;
} else {
int step = size / limit;
return size % limit == 0 ? step : step + 1;
}
}
public static void doBatch(int size, Consumer<Integer> consumer) {
int step = countStep(size, DEFAULT_MAX_BATCH_SIZE);
((Stream)Stream.iterate(0, (n) -> {
return n + 1;
}).limit((long)step).parallel()).forEach(consumer);
}
public static <T> void doBatch(Collection<T> list, Consumer<Collection<T>> consumer) {
int step = countStep(list.size(), DEFAULT_MAX_BATCH_SIZE);
((Stream)Stream.iterate(0, (n) -> {
return n + 1;
}).limit((long)step).parallel()).forEach((n) -> {
consumer.accept(((Stream)list.stream().skip((long)n * (long)DEFAULT_MAX_BATCH_SIZE).limit((long)DEFAULT_MAX_BATCH_SIZE).parallel()).collect(Collectors.toList()));
});
}
public static <T> void doBatch(Collection<T> list, int batchSize, Consumer<Collection<T>> consumer) {
int step = countStep(list.size(), batchSize);
((Stream)Stream.iterate(0, (n) -> {
return n + 1;
}).limit((long)step).parallel()).forEach((n) -> {
consumer.accept(((Stream)list.stream().skip((long)n * (long)batchSize).limit((long)batchSize).parallel()).collect(Collectors.toList()));
});
}
}
该工具类的使用
// 查询名称重复的供应商
CountDownLatch querySupplierCountDownLatch = new CountDownLatch(BatchJobUtils.countStep(supplierNameSet.size()));
// doBatch 前一个是要消费的数据,后面是消费的具体逻辑,这里使用到了多线程去处理逻辑,组装了对象,十分的方便
BatchJobUtils.doBatch(Arrays.asList(supplierNameSet.toArray()), subList -> {
threadPoolTaskExecutor.execute(() -> {
// 供应商查重
List<JdiSupplierDto> supplierDtoList = this.jdiSupplierService.listBySupplierNames(subList, jdiImportExportOperateMsg.getShopId());
if (CollectionUtils.isNotEmpty(supplierDtoList)) {
for (JdiSupplierDto supplierDto : supplierDtoList) {
supplierMap.put(supplierDto.getSupplierName(), supplierDto);
}
}
querySupplierCountDownLatch.countDown();
});
});
try {
querySupplierCountDownLatch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new BasicException("importAddOrModifyJdiSupplierArray.findJdiSupplierBySupplierNames", ResultEnum.OUTER_SERVICE_EXCEPTION.getMessage(), e);
}