===自己处理大数据数据库操作,通过线程池处理====
1.定义数据处理队列,该队列为链表阻塞队列(链式存储,插入/取出双锁控制效率高于基于数组的数组阻塞队列),防止内存溢出,队列数量给默认值
2.单线程池,用于插入队列
3.定时线程池,定期处理一定数量的数据记录(200条)
4.多线程池,用于数据插入任务,一定数量的数据记录建立一个线程任务,防止内存溢出,队列数量给默认值
140000记录 7分钟
线程池的使用,核心线程->队列->最大线程
===自己处理大数据数据库操作,通过线程池处理====================================
https://www.cnblogs.com/zhujiabin/p/5404771.html 线程池
血案:https://fangshixiang.blog.csdn.net/article/details/83656170
自己,单线程中开启带返回值的多线程:https://www.yuque.com/glacier777/sif6ci/razgx6
1.带返回值的线程,使用 get 方法,阻塞当前进程,如果线程执行慢,get方法会一直等待执行结果
2.所以如果是将任务分给带返回值的多线程,在外面在开启一个无返回值的单线程
3.多线程的开启,可以自己定义多个FutureTask用Thead开启,如https://www.yuque.com/glacier777/sif6ci/razgx6,也可以用线程池,下面的例子
转:https://blog.csdn.net/u010797364/article/details/120699570
https://blog.csdn.net/weixin_35431719/article/details/114311296 线程三种方式
ScheduledExecutorService 继承自ExecutorService 定时器线程池 循环线程 最大优点除了线程池的特性以外,可以实现循环或延迟任务。
CountDownLatch 。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。如:https://blog.csdn.net/weixin_38193118/article/details/114282245
线程中断:Future.cancel https://blog.csdn.net/weixin_34309435/article/details/91694963
https://blog.csdn.net/weixin_35884854/article/details/114050442
https://blog.csdn.net/weixin_42039228/article/details/123198358
可返回值的任务必须实现Callable接口,类似的,无返回值的任务必须Runnable接口。
执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了。
ScheduledThreadPoolExecutor是ScheduledExecutorService的子类,
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(num);
1
它有以下常用的方法:
//获取未完成的任务队列
executor.getQueue();
//移除任务队列中的任务 future = executor.scheduleAtFixedRate/scheduleWithFixedDelay
executor.remove((Runnable) future);
//取消任务队列中的schedule任务,但不会在队列中删除,即getQueue的数量不会减1,executor.setRemoveOnCancelPolicy(true)的时候减1
boolean cancel = future.cancel(true);
//schedule任务是否取消
future.isCancelled()
//延时的schedule在shutdown后,不会继续执行没到时间的任务,默认是true,会执行一次正在倒计时的任务再结束
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
//延时的scheduleAtFixedRate/scheduleWithFixedDelay在shutdown后,不会继续执行没到时间的任务,默认是true,会执行一次正在倒计时的任务再结束
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
关于阻塞队列的说明
https://blog.csdn.net/javazejian/article/details/77410889
https://blog.csdn.net/Carino_U/article/details/80005473
线程池ThreadPool有界、无界队列、同步队列以及5大参数
https://blog.csdn.net/qq_40203483/article/details/117406589
关于拒绝策略
https://blog.csdn.net/qq_45537574/article/details/121746015
对于这个7个参数线程池是如何使用的呢,接着往下看,线程池的执行流程:
1、当向线程池提交任务时,会去创建核心线程
2、当任务太多,核心线程来不及处理时,会将任务先放入任务队列
3、当任务队列满了,会去创建新的线程处理任务,但不会超过最大线程数
4、当线程空闲超过我们设置的时间大小,则回去销毁(核心线程数-最大线程数)之间的线程
5、当队列已满,总线程数已达我们设置的最大线程数时,新的任务进来则会触发我们设置的拒绝策略
总结:也即是说在线程池队列已满,且已创建到最大线程数量时,新的任务提交至线程池,线程池无法处理新的任务,则会对新的任务线程进行丢弃,抛异常,优先处理,丢弃队列中最久等待任务,这些对新的任务线程的处理则称之为线程池的拒绝策略
优先策略的讨论
https://www.zhihu.com/question/412004413
public class threadUtils {
private static long waitTime=1200;//休眠
public static void main(String[] args) throws Exception {
//创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(3);
//创建两个有返回值的任务
Callable c_One = new CallableUtil(“周杰伦”);
Callable c_Two = new CallableUtil(“王力宏”);
Callable c_Three = new CallableUtil(“林俊杰”);
//执行任务并获取Future对象<br /> Future f1 = pool.submit(c_One);<br /> Thread.sleep(waitTime);<br /> Future f2 = pool.submit(c_Two);<br /> Thread.sleep(waitTime);<br /> Future f3 = pool.submit(c_Three);<br /> Thread.sleep(waitTime);
//从Future对象上获取任务的返回值,并输出到控制台<br /> System.out.println(">>>"+f1.get().toString());<br /> System.out.println(">>>"+f2.get().toString());<br /> System.out.println(">>>"+f3.get().toString());
//关闭线程池<br /> pool.shutdown();<br /> }<br />}
package com.clinicalresearch.core.thread;
import java.util.concurrent.Callable;
public class CallableUtil implements Callable {
private String author;
public CallableUtil(String author) {<br /> this.author = author;<br /> }
/**<br /> * 业务逻辑<br /> * @return<br /> * @throws Exception<br /> */<br /> @Override<br /> public Object call() throws Exception {
String msg="";
if ("周杰伦".equals(author)){<br /> msg=author+"在2004年出了新专辑!";<br /> }<br /> if ("王力宏".equals(author)){<br /> msg=author+"在2005年出了新专辑!";<br /> }<br /> if ("林俊杰".equals(author)){<br /> msg=author+"在2006年出了新专辑!";<br /> }
return msg;<br /> }<br />}
—————————————————————————————————————————————————————————
/*
@author: qzf
@date: 2022-5-12 17:58:18
@describe:处理数据库的线程池(定时器)
*/
@Component
public class DbThreadTimeUtil {
private static final Logger log = LoggerFactory.getLogger(DbThreadTimeUtil.class);
//cpu核心数
private static final int processorsNum = Runtime.getRuntime().availableProcessors();
//数据库处理队列,防止内存溢出,队列数量给默认值
public static LinkedBlockingQueue
//单线程池,用于插入队列
public static ExecutorService poolQueue;
//定时线程池,定期处理一定数量的数据记录
public static ScheduledExecutorService poolTimer;
private final static int threadSingleProcessNum = ApplicationConfig.threadSingleProcessNum;//单次处理数据数量
//多线程池,用于数据插入任务,防止内存溢出,队列数量给默认值
public static ExecutorService poolDb;
@PostConstruct<br /> public void init(){<br /> poolQueue = Executors.newSingleThreadExecutor();<br /> poolTimer = Executors.newSingleThreadScheduledExecutor();<br /> poolDb = new ThreadPoolExecutor(processorsNum * 2, processorsNum * 2, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100000),<br /> new DbThreadFactory("DbThreadTimeUtil"),<br /> new RejectedExecutionHandler() {<br /> @Override<br /> public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<br /> String msg = "阻塞队列异常Task " + r.toString() +<br /> " rejected from " + executor.toString();<br /> log.error(msg);<br /> throw new RejectedExecutionException(msg);<br /> }<br /> });<br /> //每10秒检测队列中是否有需要处理的数据<br /> poolTimer.scheduleAtFixedRate(new Runnable() {<br /> @Override<br /> public void run() {<br /> try{<br /> //提交每次处理的数据到线程池<br /> int size = queue.size();<br /> int div = size / threadSingleProcessNum;<br /> int mod = size % threadSingleProcessNum;<br /> for(int i = 0;i < div; i++){<br /> List<SxzdProtocol> list = new ArrayList<>();<br /> int j = 0;<br /> while(j < threadSingleProcessNum ) {<br /> list.add(queue.take());<br /> j++;<br /> }<br /> addDbPoolTask(list);<br /> }<br /> //提交余下的数据到线程池<br /> if(mod > 0){<br /> List<SxzdProtocol> list = new ArrayList<>();<br /> int j = 0;<br /> while(j < mod ) {<br /> list.add(queue.take());<br /> j++;<br /> }<br /> addDbPoolTask(list);<br /> }<br /> }<br /> catch (Exception e){<br /> e.printStackTrace();<br /> log.error("定时执行出错:"+e.getMessage());<br /> }<br /> }<br /> },10,10,TimeUnit.SECONDS);<br /> }<br /> @PreDestroy<br /> public void destroy(){<br /> System.out.print("关闭线程池DbThreadTimeUtil");<br /> log.info("关闭线程池SxzdDecodeUtil");<br /> shutdownPool();<br /> }<br /> private void shutdownPool(){<br /> //1、停止线程池接收新任务<br /> //2、等待一定时间,让现存的任务执行结束<br /> //3、取消当前运行的任务<br /> //4、如果当前线程也被中断的话,那么就再次关闭线程池,同时恢复中断状态<br /> try{<br /> poolTimer.shutdown();<br /> if(!poolTimer.awaitTermination(60,TimeUnit.SECONDS)){<br /> poolTimer.shutdownNow();<br /> if(!poolTimer.awaitTermination(60,TimeUnit.SECONDS)){<br /> System.out.println("DbThreadTimeUtil Pool did not terminate");<br /> }<br /> }<br /> }catch(InterruptedException ie){<br /> poolTimer.shutdownNow();<br /> Thread.currentThread().interrupt();<br /> }<br /> try{<br /> poolQueue.shutdown();<br /> if(!poolQueue.awaitTermination(60,TimeUnit.SECONDS)){<br /> poolQueue.shutdownNow();<br /> if(!poolQueue.awaitTermination(60,TimeUnit.SECONDS)){<br /> System.out.println("DbThreadTimeUtil Pool did not terminate");<br /> }<br /> }<br /> }catch(InterruptedException ie){<br /> poolQueue.shutdownNow();<br /> Thread.currentThread().interrupt();<br /> }<br /> <br /> try{<br /> poolDb.shutdown();<br /> if(!poolDb.awaitTermination(60,TimeUnit.SECONDS)){<br /> poolDb.shutdownNow();<br /> if(!poolDb.awaitTermination(60,TimeUnit.SECONDS)){<br /> System.out.println("DbThreadTimeUtil Pool did not terminate");<br /> }<br /> }<br /> }catch(InterruptedException ie){<br /> poolDb.shutdownNow();<br /> Thread.currentThread().interrupt();<br /> }<br /> }<br /> /**<br /> * @author: qzf<br /> * @date: 2022/5/12 17:51<br /> * @describe: 通过单线程池将本次要处理的数据对象插入待数据处理队列(有序)<br /> * @param: <br /> * @return: <br /> */<br /> public static void insertQueue(SxzdProtocol sxzdProtocol){<br /> try {<br /> poolQueue.execute(new Runnable() {<br /> @Override<br /> public void run() {<br /> try {<br /> queue.put(sxzdProtocol);<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("插入队列出错:" + e.getMessage());<br /> }<br /> }<br /> });<br /> }<br /> catch(Exception e){<br /> e.printStackTrace();<br /> log.error("poolQueue插入队列出错:" + e.getMessage());<br /> }<br /> }<br /> /**<br /> * @author: qzf<br /> * @date: 2022/5/12 17:53<br /> * @describe: 本批待数据处理插入处理线程池执行<br /> * @param: <br /> * @return: <br /> */<br /> private static void addDbPoolTask(List<SxzdProtocol> list){<br /> if(list.size() == 0) {<br /> return;<br /> }<br /> try {<br /> poolDb.execute(new Runnable() {<br /> @Override<br /> public void run() {<br /> List<YwTplsXq> ywTplsXqList = new ArrayList<>();<br /> List<YwTpls> ywTplsList = new ArrayList<>();<br /> YwTplsXq ywTplsXq = new YwTplsXq();<br /> YwTpls ywTpls = new YwTpls();<br /> for (SxzdProtocol sxzdProtocol : list) {<br /> log.debug("线程名字" + Thread.currentThread().getName());<br /> //图片(设备->平台)<br /> if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PIC_S2P){<br /> try {<br /> //log.debug("keynosub=====:"+sxzdProtocol.getKeynoSub());<br /> ywTplsXq = new YwTplsXq();<br /> ywTplsXq.setXqno(sxzdProtocol.getKeynoSub());<br /> ywTplsXq.setKeyno(sxzdProtocol.getKeyno());<br /> ywTplsXq.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ywTplsXq.setXh(sxzdProtocol.getMsgseqInt());<br /> ywTplsXq.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> SxzdDecodeUtil.ywTplsXqService.save(ywTplsXq);<br /> //待处理测试使用正式屏蔽数据库异常<br /> //int a = 1/0;<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("图片分包解码信息插入数据库记录出错" + e.getMessage());<br /> ywTplsXq.setCreatedate(DateUtil.getCurrentDateTime());<br /> ywTplsXqList.add(ywTplsXq);<br /> }<br /> try {<br /> //如果生成了图片,插入图片历史记录<br /> if (!StringUtil.isEmpty(sxzdProtocol.getPicUrl())) {<br /> ywTpls = new YwTpls();<br /> ywTpls.setKeyno(sxzdProtocol.getKeyno());<br /> ywTpls.setIp(sxzdProtocol.getIp());<br /> ywTpls.setPort(sxzdProtocol.getPort());<br /> ywTpls.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ywTpls.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getPicBytesAll()));//无需处理<br /> ywTpls.setGjver(sxzdProtocol.getGjverInt() + "");<br /> ywTpls.setMyver(sxzdProtocol.getMyverInt() + "");<br /> ywTpls.setImei(sxzdProtocol.getSbbhChar());<br /> ywTpls.setTppslx(ByteUtil.bytesToIntHighAhead(sxzdProtocol.getPslx()));<br /> ywTpls.setTpwjwz(sxzdProtocol.getPicUrl());<br /> ywTpls.setAlsbsz(StringUtil.isEmpty(sxzdProtocol.getSbds()) ? BigDecimal.ZERO : new BigDecimal(sxzdProtocol.getSbds()));<br /> ywTpls.setPzsj(sxzdProtocol.getPzsjChar());<br /> if (SxzdDecodeUtil.ywTplsService.getByPrimaryKey(sxzdProtocol.getKeyno()) == null) {<br /> SxzdDecodeUtil.ywTplsService.save(ywTpls);<br /> } else {<br /> SxzdDecodeUtil.ywTplsService.update(ywTpls);<br /> }<br /> //待处理测试使用正式屏蔽数据库异常<br /> //int a = 1/0;<br /> }<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("图片完整解码信息插入数据库记录出错" + e.getMessage());<br /> ywTpls.setCreatedate(DateUtil.getCurrentDateTime());<br /> ywTplsList.add(ywTpls);<br /> }<br /> }<br /> //图片(平台->设备)<br /> else if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PIC_P2S) {<br /> try {<br /> YwYdtplsXq ywYdtplsXq = new YwYdtplsXq();<br /> ywYdtplsXq.setXqno(sxzdProtocol.getKeynoSub());<br /> ywYdtplsXq.setKeyno(sxzdProtocol.getKeyno());<br /> ywYdtplsXq.setXh(sxzdProtocol.getMsgseqInt());<br /> ywYdtplsXq.setIp(sxzdProtocol.getIp());<br /> ywYdtplsXq.setPort(sxzdProtocol.getPort());<br /> ywYdtplsXq.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ywYdtplsXq.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> ywYdtplsXq.setGjver(sxzdProtocol.getGjverInt() + "");<br /> ywYdtplsXq.setMyver(sxzdProtocol.getMyverInt() + "");<br /> ywYdtplsXq.setImei(sxzdProtocol.getSbbhChar());<br /> ywYdtplsXq.setAesflag(ByteUtil.bytesToIntHighAhead(sxzdProtocol.getJmbs()));<br /> ywYdtplsXq.setJgm(sxzdProtocol.getResultcodeInt() + "");<br /> ywYdtplsXq.setYdsj(DateUtil.getCurrentDateTime());<br /> SxzdEncodeUtil.ywYdtplsXqService.save(ywYdtplsXq);<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("图片编码信息插入数据库记录出错" + e.getMessage());<br /> }<br /> }<br /> //参数(设备->平台)<br /> else if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PARAM_S2P){<br /> //记录参数数据主键<br /> YwQqcslsjl qqcslsjl = new YwQqcslsjl();<br /> String dtxxNote = "";<br /> qqcslsjl.setKeyno(sxzdProtocol.getKeyno());<br /> qqcslsjl.setIp(sxzdProtocol.getIp());<br /> qqcslsjl.setPort(sxzdProtocol.getPort());<br /> qqcslsjl.setXxbm(sxzdProtocol.getMsgidInt()+"");<br /> qqcslsjl.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> qqcslsjl.setGjver(sxzdProtocol.getGjverInt()+"");<br /> qqcslsjl.setMyver(sxzdProtocol.getMyverInt()+"");<br /> qqcslsjl.setImei(sxzdProtocol.getSbbhChar());<br /> qqcslsjl.setSbdy(sxzdProtocol.getDyInt());<br /> qqcslsjl.setRsrp((int)sxzdProtocol.getXhqd()[0]);<br /> if(sxzdProtocol.getDecodeMsgbodyArray() != null) {<br /> for (SxzdDecodeMsgBody sxzdDecodeMsgBody : sxzdProtocol.getDecodeMsgbodyArray()){<br /> dtxxNote += sxzdDecodeMsgBody.getFieldRemark()+"\n";<br /> }<br /> }<br /> qqcslsjl.setDtxxNote(dtxxNote);<br /> qqcslsjl.setDtxx(ByteUtil.bytesToHex(sxzdProtocol.getMsgbody()));<br /> qqcslsjl.setQqsj(DateUtil.getCurrentDateTime());<br /> SxzdDecodeUtil.ywQqcslsjlService.save(qqcslsjl);<br /> }<br /> //参数(平台->设备)<br /> else if(sxzdProtocol.getZldmInt() == SxzdProtocol.MSG_HEAD_PARAM_P2S) {<br /> try {<br /> String dtxxNote = "";<br /> YwYdcslsjl ydcslsjl = new YwYdcslsjl();<br /> ydcslsjl.setKeyno(sxzdProtocol.getKeyno());<br /> ydcslsjl.setIp(sxzdProtocol.getIp());<br /> ydcslsjl.setPort(sxzdProtocol.getPort());<br /> ydcslsjl.setXxbm(sxzdProtocol.getMsgidInt() + "");<br /> ydcslsjl.setXxnr(ByteUtil.bytesToHex(sxzdProtocol.getCurrBytes()));<br /> ydcslsjl.setGjver(sxzdProtocol.getGjverInt() + "");<br /> ydcslsjl.setMyver(sxzdProtocol.getMyverInt() + "");<br /> ydcslsjl.setImei(sxzdProtocol.getSbbhChar());<br /> ydcslsjl.setAesflag(ByteUtil.bytesToIntHighAhead( sxzdProtocol.getJmbs()));<br /> ydcslsjl.setJgm(sxzdProtocol.getResultcodeInt() + "");<br /> ydcslsjl.setBgz(sxzdProtocol.getBgzInt());<br /> ydcslsjl.setSgdflag(sxzdProtocol.getSgdflagInt());<br /> ydcslsjl.setXccbsjc(sxzdProtocol.getXccbsjcChar());<br /> ydcslsjl.setXctxsjc(sxzdProtocol.getXctxsjcChar());<br /> ydcslsjl.setCccs(sxzdProtocol.getCccsInt());<br /> ydcslsjl.setCcjgsj(sxzdProtocol.getCcjgsjInt());<br /> if (sxzdProtocol.getEncodeMsgbodyArray() != null) {<br /> for (SxzdEncodeMsgBody sxzdEncodeMsgBody : sxzdProtocol.getEncodeMsgbodyArray()) {<br /> dtxxNote += sxzdEncodeMsgBody.getFieldRemark() + "\n";<br /> }<br /> }<br /> ydcslsjl.setDtxxNote(dtxxNote);<br /> ydcslsjl.setDtxx(ByteUtil.bytesToHex(sxzdProtocol.getMsgbody()));<br /> ydcslsjl.setYdsj(DateUtil.getCurrentDateTime());<br /> SxzdEncodeUtil.ywYdcslsjlService.save(ydcslsjl);<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("参数编码信息插入数据库记录出错" + e.getMessage());<br /> }<br /> }
}<br /> String csvSecondPath = (new SimpleDateFormat("yyyy-MM")).format(new Date())+"/csv/";<br /> //将插入失败的记录,同时保存csv和h2db数据库<br /> if(ywTplsXqList.size() > 0){<br /> LinkedHashMap<String,Object> map = new LinkedHashMap<>();<br /> map.put("分包消息主键","xqno");<br /> map.put("消息主键","keyno");<br /> map.put("消息编码","xxbm");<br /> map.put("序号","xh");<br /> map.put("消息内容","xxnr");<br /> map.put("入库时间","createdate");<br /> MyCsvUtil.log(map,ywTplsXqList,csvSecondPath,"分包图片");<br /> for(YwTplsXq aYwTplsXq: ywTplsXqList) {<br /> try {<br /> SxzdDecodeUtil.ywTplsXqServiceH2.save(aYwTplsXq);<br /> }catch(Exception e){<br /> e.printStackTrace();<br /> log.error("分包图片插入h2db出错"+e.getMessage());<br /> }<br /> }<br /> }<br /> if(ywTplsList.size() > 0){<br /> LinkedHashMap<String,Object> map = new LinkedHashMap<>();<br /> map.put("消息主键","keyno");<br /> map.put("ip","ip");<br /> map.put("端口","port");<br /> map.put("消息编码","xxbm");<br /> map.put("消息内容","xxnr");<br /> map.put("固件版本","gjver");<br /> map.put("密钥版本","myver");<br /> map.put("设备编号","imei");<br /> map.put("图片拍摄类型","tppslx");<br /> map.put("图片文件位置","tpwjwz");<br /> map.put("阿里识别数值","alsbsz");<br /> map.put("拍照时间","pzsj");<br /> map.put("入库时间","createdate");<br /> MyCsvUtil.log(map,ywTplsList,csvSecondPath,"完整图片");<br /> for(YwTpls aYwTpls: ywTplsList) {<br /> try {<br /> if (SxzdDecodeUtil.ywTplsServiceH2.getByPrimaryKey(aYwTpls.getKeyno()) == null) {<br /> SxzdDecodeUtil.ywTplsServiceH2.save(aYwTpls);<br /> } else {<br /> SxzdDecodeUtil.ywTplsServiceH2.update(aYwTpls);<br /> }<br /> }catch (Exception e){<br /> e.printStackTrace();<br /> log.error("分包图片插入h2db出错"+e.getMessage());<br /> }<br /> }<br /> }<br /> }<br /> });<br /> }<br /> catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("本批待数据处理插入数据处理线程池执行出错:" + e.getMessage());<br /> }<br /> }<br />}