===自己处理大数据数据库操作,通过线程池处理====
    1.定义数据处理队列,该队列为链表阻塞队列(链式存储,插入/取出双锁控制效率高于基于数组的数组阻塞队列),防止内存溢出,队列数量给默认值
    2.单线程池,用于插入队列
    3.定时线程池,定期处理一定数量的数据记录(200条)
    4.多线程池,用于数据插入任务,一定数量的数据记录建立一个线程任务,防止内存溢出,队列数量给默认值
    140000记录 7分钟
    线程池的使用,核心线程->队列->最大线程
    ===自己处理大数据数据库操作,通过线程池处理====================================
    https://www.cnblogs.com/zhujiabin/p/5404771.html 线程池
    血案:https://fangshixiang.blog.csdn.net/article/details/83656170
    自己,单线程中开启带返回值的多线程:https://www.yuque.com/glacier777/sif6ci/razgx6
    1.带返回值的线程,使用 get 方法,阻塞当前进程,如果线程执行慢,get方法会一直等待执行结果
    2.所以如果是将任务分给带返回值的多线程,在外面在开启一个无返回值的单线程
    3.多线程的开启,可以自己定义多个FutureTask用Thead开启,如https://www.yuque.com/glacier777/sif6ci/razgx6,也可以用线程池,下面的例子
    转:https://blog.csdn.net/u010797364/article/details/120699570
    https://blog.csdn.net/weixin_35431719/article/details/114311296 线程三种方式
    ScheduledExecutorService 继承自ExecutorService 定时器线程池 循环线程 最大优点除了线程池的特性以外,可以实现循环或延迟任务。
    CountDownLatch 。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。如:https://blog.csdn.net/weixin_38193118/article/details/114282245
    线程中断:Future.cancel https://blog.csdn.net/weixin_34309435/article/details/91694963
    https://blog.csdn.net/weixin_35884854/article/details/114050442
    https://blog.csdn.net/weixin_42039228/article/details/123198358
    可返回值的任务必须实现Callable接口,类似的,无返回值的任务必须Runnable接口。
    执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了。
    ScheduledThreadPoolExecutor是ScheduledExecutorService的子类,

    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(num);
    1
    它有以下常用的方法:

    //获取未完成的任务队列
    executor.getQueue();
    //移除任务队列中的任务 future = executor.scheduleAtFixedRate/scheduleWithFixedDelay
    executor.remove((Runnable) future);
    //取消任务队列中的schedule任务,但不会在队列中删除,即getQueue的数量不会减1,executor.setRemoveOnCancelPolicy(true)的时候减1
    boolean cancel = future.cancel(true);
    //schedule任务是否取消
    future.isCancelled()
    //延时的schedule在shutdown后,不会继续执行没到时间的任务,默认是true,会执行一次正在倒计时的任务再结束
    executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    //延时的scheduleAtFixedRate/scheduleWithFixedDelay在shutdown后,不会继续执行没到时间的任务,默认是true,会执行一次正在倒计时的任务再结束
    executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);

    关于阻塞队列的说明
    https://blog.csdn.net/javazejian/article/details/77410889
    https://blog.csdn.net/Carino_U/article/details/80005473
    线程池ThreadPool有界、无界队列、同步队列以及5大参数
    https://blog.csdn.net/qq_40203483/article/details/117406589
    关于拒绝策略
    https://blog.csdn.net/qq_45537574/article/details/121746015
    对于这个7个参数线程池是如何使用的呢,接着往下看,线程池的执行流程:
    1、当向线程池提交任务时,会去创建核心线程
    2、当任务太多,核心线程来不及处理时,会将任务先放入任务队列
    3、当任务队列满了,会去创建新的线程处理任务,但不会超过最大线程数
    4、当线程空闲超过我们设置的时间大小,则回去销毁(核心线程数-最大线程数)之间的线程
    5、当队列已满,总线程数已达我们设置的最大线程数时,新的任务进来则会触发我们设置的拒绝策略

    总结:也即是说在线程池队列已满,且已创建到最大线程数量时,新的任务提交至线程池,线程池无法处理新的任务,则会对新的任务线程进行丢弃,抛异常,优先处理,丢弃队列中最久等待任务,这些对新的任务线程的处理则称之为线程池的拒绝策略
    优先策略的讨论
    https://www.zhihu.com/question/412004413

    public class threadUtils {
    private static long waitTime=1200;//休眠
    public static void main(String[] args) throws Exception {
    //创建一个线程池
    ExecutorService pool = Executors.newFixedThreadPool(3);
    //创建两个有返回值的任务
    Callable c_One = new CallableUtil(“周杰伦”);
    Callable c_Two = new CallableUtil(“王力宏”);
    Callable c_Three = new CallableUtil(“林俊杰”);

    1. //执行任务并获取Future对象<br /> Future f1 = pool.submit(c_One);<br /> Thread.sleep(waitTime);<br /> Future f2 = pool.submit(c_Two);<br /> Thread.sleep(waitTime);<br /> Future f3 = pool.submit(c_Three);<br /> Thread.sleep(waitTime);
    2. //从Future对象上获取任务的返回值,并输出到控制台<br /> System.out.println(">>>"+f1.get().toString());<br /> System.out.println(">>>"+f2.get().toString());<br /> System.out.println(">>>"+f3.get().toString());
    3. //关闭线程池<br /> pool.shutdown();<br /> }<br />}

    package com.clinicalresearch.core.thread;

    import java.util.concurrent.Callable;

    public class CallableUtil implements Callable {
    private String author;

    1. public CallableUtil(String author) {<br /> this.author = author;<br /> }
    2. /**<br /> * 业务逻辑<br /> * @return<br /> * @throws Exception<br /> */<br /> @Override<br /> public Object call() throws Exception {
    3. String msg="";
    4. if ("周杰伦".equals(author)){<br /> msg=author+"在2004年出了新专辑!";<br /> }<br /> if ("王力宏".equals(author)){<br /> msg=author+"在2005年出了新专辑!";<br /> }<br /> if ("林俊杰".equals(author)){<br /> msg=author+"在2006年出了新专辑!";<br /> }
    5. return msg;<br /> }<br />}

    —————————————————————————————————————————————————————————
    /*
    @author: qzf
    @date: 2022-5-12 17:58:18
    @describe:处理数据库的线程池(定时器)
    */
    @Component
    public class DbThreadTimeUtil {
    private static final Logger log = LoggerFactory.getLogger(DbThreadTimeUtil.class);
    //cpu核心数
    private static final int processorsNum = Runtime.getRuntime().availableProcessors();
    //数据库处理队列,防止内存溢出,队列数量给默认值
    public static LinkedBlockingQueue queue = new LinkedBlockingQueue(200000);
    //单线程池,用于插入队列
    public static ExecutorService poolQueue;
    //定时线程池,定期处理一定数量的数据记录
    public static ScheduledExecutorService poolTimer;
    private final static int threadSingleProcessNum = ApplicationConfig.threadSingleProcessNum;//单次处理数据数量
    //多线程池,用于数据插入任务,防止内存溢出,队列数量给默认值
    public static ExecutorService poolDb;

    1. @PostConstruct<br /> public void init(){<br /> poolQueue = Executors.newSingleThreadExecutor();<br /> poolTimer = Executors.newSingleThreadScheduledExecutor();<br /> poolDb = new ThreadPoolExecutor(processorsNum * 2, processorsNum * 2, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100000),<br /> new DbThreadFactory("DbThreadTimeUtil"),<br /> new RejectedExecutionHandler() {<br /> @Override<br /> public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<br /> String msg = "阻塞队列异常Task " + r.toString() +<br /> " rejected from " + executor.toString();<br /> log.error(msg);<br /> throw new RejectedExecutionException(msg);<br /> }<br /> });<br /> //每10秒检测队列中是否有需要处理的数据<br /> poolTimer.scheduleAtFixedRate(new Runnable() {<br /> @Override<br /> public void run() {<br /> try{<br /> //提交每次处理的数据到线程池<br /> int size = queue.size();<br /> int div = size / threadSingleProcessNum;<br /> int mod = size % threadSingleProcessNum;<br /> for(int i = 0;i < div; i++){<br /> List<SxzdProtocol> list = new ArrayList<>();<br /> int j = 0;<br /> while(j < threadSingleProcessNum ) {<br /> list.add(queue.take());<br /> j++;<br /> }<br /> addDbPoolTask(list);<br /> }<br /> //提交余下的数据到线程池<br /> if(mod > 0){<br /> List<SxzdProtocol> list = new ArrayList<>();<br /> int j = 0;<br /> while(j < mod ) {<br /> list.add(queue.take());<br /> j++;<br /> }<br /> addDbPoolTask(list);<br /> }<br /> }<br /> catch (Exception e){<br /> e.printStackTrace();<br /> log.error("定时执行出错:"+e.getMessage());<br /> }<br /> }<br /> },10,10,TimeUnit.SECONDS);<br /> }<br /> @PreDestroy<br /> public void destroy(){<br /> System.out.print("关闭线程池DbThreadTimeUtil");<br /> log.info("关闭线程池SxzdDecodeUtil");<br /> shutdownPool();<br /> }<br /> private void shutdownPool(){<br /> //1、停止线程池接收新任务<br /> //2、等待一定时间,让现存的任务执行结束<br /> //3、取消当前运行的任务<br /> //4、如果当前线程也被中断的话,那么就再次关闭线程池,同时恢复中断状态<br /> try{<br /> poolTimer.shutdown();<br /> if(!poolTimer.awaitTermination(60,TimeUnit.SECONDS)){<br /> poolTimer.shutdownNow();<br /> if(!poolTimer.awaitTermination(60,TimeUnit.SECONDS)){<br /> System.out.println("DbThreadTimeUtil Pool did not terminate");<br /> }<br /> }<br /> }catch(InterruptedException ie){<br /> poolTimer.shutdownNow();<br /> Thread.currentThread().interrupt();<br /> }<br /> try{<br /> poolQueue.shutdown();<br /> if(!poolQueue.awaitTermination(60,TimeUnit.SECONDS)){<br /> poolQueue.shutdownNow();<br /> if(!poolQueue.awaitTermination(60,TimeUnit.SECONDS)){<br /> System.out.println("DbThreadTimeUtil Pool did not terminate");<br /> }<br /> }<br /> }catch(InterruptedException ie){<br /> poolQueue.shutdownNow();<br /> Thread.currentThread().interrupt();<br /> }<br /> <br /> try{<br /> poolDb.shutdown();<br /> if(!poolDb.awaitTermination(60,TimeUnit.SECONDS)){<br /> poolDb.shutdownNow();<br /> if(!poolDb.awaitTermination(60,TimeUnit.SECONDS)){<br /> System.out.println("DbThreadTimeUtil Pool did not terminate");<br /> }<br /> }<br /> }catch(InterruptedException ie){<br /> poolDb.shutdownNow();<br /> Thread.currentThread().interrupt();<br /> }<br /> }<br /> /**<br /> * @author: qzf<br /> * @date: 2022/5/12 17:51<br /> * @describe: 通过单线程池将本次要处理的数据对象插入待数据处理队列(有序)<br /> * @param: <br /> * @return: <br /> */<br /> public static void insertQueue(SxzdProtocol sxzdProtocol){<br /> try {<br /> poolQueue.execute(new Runnable() {<br /> @Override<br /> public void run() {<br /> try {<br /> queue.put(sxzdProtocol);<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("插入队列出错:" + e.getMessage());<br /> }<br /> }<br /> });<br /> }<br /> catch(Exception e){<br /> e.printStackTrace();<br /> log.error("poolQueue插入队列出错:" + e.getMessage());<br /> }<br /> }<br /> /**<br /> * @author: qzf<br /> * @date: 2022/5/12 17:53<br /> * @describe: 本批待数据处理插入处理线程池执行<br /> * @param: <br /> * @return: <br /> */<br /> private static void addDbPoolTask(List<SxzdProtocol> list){<br /> if(list.size() == 0) {<br /> return;<br /> }<br /> try {<br /> poolDb.execute(new Runnable() {<br /> @Override<br /> public void run() {<br /> List<YwTplsXq> ywTplsXqList = new ArrayList<>();<br /> List<YwTpls> ywTplsList = new ArrayList<>();<br /> YwTplsXq ywTplsXq = new YwTplsXq();<br /> YwTpls ywTpls = new YwTpls();<br /> for (SxzdProtocol sxzdProtocol : list) {<br /> log.debug("线程名字" + Thread.currentThread().getName());<br /> //图片(设备->平台)<br /> if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PIC_S2P){<br /> try {<br /> //log.debug("keynosub=====:"+sxzdProtocol.getKeynoSub());<br /> ywTplsXq = new YwTplsXq();<br /> ywTplsXq.setXqno(sxzdProtocol.getKeynoSub());<br /> ywTplsXq.setKeyno(sxzdProtocol.getKeyno());<br /> ywTplsXq.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ywTplsXq.setXh(sxzdProtocol.getMsgseqInt());<br /> ywTplsXq.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> SxzdDecodeUtil.ywTplsXqService.save(ywTplsXq);<br /> //待处理测试使用正式屏蔽数据库异常<br /> //int a = 1/0;<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("图片分包解码信息插入数据库记录出错" + e.getMessage());<br /> ywTplsXq.setCreatedate(DateUtil.getCurrentDateTime());<br /> ywTplsXqList.add(ywTplsXq);<br /> }<br /> try {<br /> //如果生成了图片,插入图片历史记录<br /> if (!StringUtil.isEmpty(sxzdProtocol.getPicUrl())) {<br /> ywTpls = new YwTpls();<br /> ywTpls.setKeyno(sxzdProtocol.getKeyno());<br /> ywTpls.setIp(sxzdProtocol.getIp());<br /> ywTpls.setPort(sxzdProtocol.getPort());<br /> ywTpls.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ywTpls.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getPicBytesAll()));//无需处理<br /> ywTpls.setGjver(sxzdProtocol.getGjverInt() + "");<br /> ywTpls.setMyver(sxzdProtocol.getMyverInt() + "");<br /> ywTpls.setImei(sxzdProtocol.getSbbhChar());<br /> ywTpls.setTppslx(ByteUtil.bytesToIntHighAhead(sxzdProtocol.getPslx()));<br /> ywTpls.setTpwjwz(sxzdProtocol.getPicUrl());<br /> ywTpls.setAlsbsz(StringUtil.isEmpty(sxzdProtocol.getSbds()) ? BigDecimal.ZERO : new BigDecimal(sxzdProtocol.getSbds()));<br /> ywTpls.setPzsj(sxzdProtocol.getPzsjChar());<br /> if (SxzdDecodeUtil.ywTplsService.getByPrimaryKey(sxzdProtocol.getKeyno()) == null) {<br /> SxzdDecodeUtil.ywTplsService.save(ywTpls);<br /> } else {<br /> SxzdDecodeUtil.ywTplsService.update(ywTpls);<br /> }<br /> //待处理测试使用正式屏蔽数据库异常<br /> //int a = 1/0;<br /> }<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("图片完整解码信息插入数据库记录出错" + e.getMessage());<br /> ywTpls.setCreatedate(DateUtil.getCurrentDateTime());<br /> ywTplsList.add(ywTpls);<br /> }<br /> }<br /> //图片(平台->设备)<br /> else if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PIC_P2S) {<br /> try {<br /> YwYdtplsXq ywYdtplsXq = new YwYdtplsXq();<br /> ywYdtplsXq.setXqno(sxzdProtocol.getKeynoSub());<br /> ywYdtplsXq.setKeyno(sxzdProtocol.getKeyno());<br /> ywYdtplsXq.setXh(sxzdProtocol.getMsgseqInt());<br /> ywYdtplsXq.setIp(sxzdProtocol.getIp());<br /> ywYdtplsXq.setPort(sxzdProtocol.getPort());<br /> ywYdtplsXq.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ywYdtplsXq.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> ywYdtplsXq.setGjver(sxzdProtocol.getGjverInt() + "");<br /> ywYdtplsXq.setMyver(sxzdProtocol.getMyverInt() + "");<br /> ywYdtplsXq.setImei(sxzdProtocol.getSbbhChar());<br /> ywYdtplsXq.setAesflag(ByteUtil.bytesToIntHighAhead(sxzdProtocol.getJmbs()));<br /> ywYdtplsXq.setJgm(sxzdProtocol.getResultcodeInt() + "");<br /> ywYdtplsXq.setYdsj(DateUtil.getCurrentDateTime());<br /> SxzdEncodeUtil.ywYdtplsXqService.save(ywYdtplsXq);<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("图片编码信息插入数据库记录出错" + e.getMessage());<br /> }<br /> }<br /> //参数(设备->平台)<br /> else if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PARAM_S2P){<br /> //记录参数数据主键<br /> YwQqcslsjl qqcslsjl = new YwQqcslsjl();<br /> String dtxxNote = "";<br /> qqcslsjl.setKeyno(sxzdProtocol.getKeyno());<br /> qqcslsjl.setIp(sxzdProtocol.getIp());<br /> qqcslsjl.setPort(sxzdProtocol.getPort());<br /> qqcslsjl.setXxbm(sxzdProtocol.getMsgidInt()+"");<br /> qqcslsjl.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> qqcslsjl.setGjver(sxzdProtocol.getGjverInt()+"");<br /> qqcslsjl.setMyver(sxzdProtocol.getMyverInt()+"");<br /> qqcslsjl.setImei(sxzdProtocol.getSbbhChar());<br /> qqcslsjl.setSbdy(sxzdProtocol.getDyInt());<br /> qqcslsjl.setRsrp((int)sxzdProtocol.getXhqd()[0]);<br /> if(sxzdProtocol.getDecodeMsgbodyArray() != null) {<br /> for (SxzdDecodeMsgBody sxzdDecodeMsgBody : sxzdProtocol.getDecodeMsgbodyArray()){<br /> dtxxNote += sxzdDecodeMsgBody.getFieldRemark()+"\n";<br /> }<br /> }<br /> qqcslsjl.setDtxxNote(dtxxNote);<br /> qqcslsjl.setDtxx(ByteUtil.bytesToHex(sxzdProtocol.getMsgbody()));<br /> qqcslsjl.setQqsj(DateUtil.getCurrentDateTime());<br /> SxzdDecodeUtil.ywQqcslsjlService.save(qqcslsjl);<br /> }<br /> //参数(平台->设备)<br /> else if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PARAM_P2S) {<br /> try {<br /> String dtxxNote = "";<br /> YwYdcslsjl ydcslsjl = new YwYdcslsjl();<br /> ydcslsjl.setKeyno(sxzdProtocol.getKeyno());<br /> ydcslsjl.setIp(sxzdProtocol.getIp());<br /> ydcslsjl.setPort(sxzdProtocol.getPort());<br /> ydcslsjl.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ydcslsjl.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> ydcslsjl.setGjver(sxzdProtocol.getGjverInt() + "");<br /> ydcslsjl.setMyver(sxzdProtocol.getMyverInt() + "");<br /> ydcslsjl.setImei(sxzdProtocol.getSbbhChar());<br /> ydcslsjl.setAesflag(ByteUtil.bytesToIntHighAhead( sxzdProtocol.getJmbs()));<br /> ydcslsjl.setJgm(sxzdProtocol.getResultcodeInt() + "");<br /> ydcslsjl.setBgz(sxzdProtocol.getBgzInt());<br /> ydcslsjl.setSgdflag(sxzdProtocol.getSgdflagInt());<br /> ydcslsjl.setXccbsjc(sxzdProtocol.getXccbsjcChar());<br /> ydcslsjl.setXctxsjc(sxzdProtocol.getXctxsjcChar());<br /> ydcslsjl.setCccs(sxzdProtocol.getCccsInt());<br /> ydcslsjl.setCcjgsj(sxzdProtocol.getCcjgsjInt());<br /> if (sxzdProtocol.getEncodeMsgbodyArray() != null) {<br /> for (SxzdEncodeMsgBody sxzdEncodeMsgBody : sxzdProtocol.getEncodeMsgbodyArray()) {<br /> dtxxNote += sxzdEncodeMsgBody.getFieldRemark() + "\n";<br /> }<br /> }<br /> ydcslsjl.setDtxxNote(dtxxNote);<br /> ydcslsjl.setDtxx(ByteUtil.bytesToHex(sxzdProtocol.getMsgbody()));<br /> ydcslsjl.setYdsj(DateUtil.getCurrentDateTime());<br /> SxzdEncodeUtil.ywYdcslsjlService.save(ydcslsjl);<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("参数编码信息插入数据库记录出错" + e.getMessage());<br /> }<br /> }
    2. }<br /> String csvSecondPath = (new SimpleDateFormat("yyyy-MM")).format(new Date())+"/csv/";<br /> //将插入失败的记录,同时保存csv和h2db数据库<br /> if(ywTplsXqList.size() > 0){<br /> LinkedHashMap<String,Object> map = new LinkedHashMap<>();<br /> map.put("分包消息主键","xqno");<br /> map.put("消息主键","keyno");<br /> map.put("消息编码","xxbm");<br /> map.put("序号","xh");<br /> map.put("消息内容","xxnr");<br /> map.put("入库时间","createdate");<br /> MyCsvUtil.log(map,ywTplsXqList,csvSecondPath,"分包图片");<br /> for(YwTplsXq aYwTplsXq: ywTplsXqList) {<br /> try {<br /> SxzdDecodeUtil.ywTplsXqServiceH2.save(aYwTplsXq);<br /> }catch(Exception e){<br /> e.printStackTrace();<br /> log.error("分包图片插入h2db出错"+e.getMessage());<br /> }<br /> }<br /> }<br /> if(ywTplsList.size() > 0){<br /> LinkedHashMap<String,Object> map = new LinkedHashMap<>();<br /> map.put("消息主键","keyno");<br /> map.put("ip","ip");<br /> map.put("端口","port");<br /> map.put("消息编码","xxbm");<br /> map.put("消息内容","xxnr");<br /> map.put("固件版本","gjver");<br /> map.put("密钥版本","myver");<br /> map.put("设备编号","imei");<br /> map.put("图片拍摄类型","tppslx");<br /> map.put("图片文件位置","tpwjwz");<br /> map.put("阿里识别数值","alsbsz");<br /> map.put("拍照时间","pzsj");<br /> map.put("入库时间","createdate");<br /> MyCsvUtil.log(map,ywTplsList,csvSecondPath,"完整图片");<br /> for(YwTpls aYwTpls: ywTplsList) {<br /> try {<br /> if (SxzdDecodeUtil.ywTplsServiceH2.getByPrimaryKey(aYwTpls.getKeyno()) == null) {<br /> SxzdDecodeUtil.ywTplsServiceH2.save(aYwTpls);<br /> } else {<br /> SxzdDecodeUtil.ywTplsServiceH2.update(aYwTpls);<br /> }<br /> }catch (Exception e){<br /> e.printStackTrace();<br /> log.error("分包图片插入h2db出错"+e.getMessage());<br /> }<br /> }<br /> }<br /> }<br /> });<br /> }<br /> catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("本批待数据处理插入数据处理线程池执行出错:" + e.getMessage());<br /> }<br /> }<br />}