1. 分布式批量
1.1 项目简介和整体流程
通过行内提供的分布式批量框架jar包,接入到分布式批量平台。各部门开发自己的执行器和作业,由行内分布式批量平台的控制器进行统一调度。控制器和执行器通过 zookeeper 进行通讯,执行器根据控制器的命令启动不同的作业进行批处理任务
1.2 细节
1.2.1 项目启动
根据 SpringBoot 自动配置原理,实现了两个自动配置类,用于各部门更加方便地使用此分布式批量框架,下面讲自动配置类中较为重点的 bean 进行记录
1. ExecutorPropertiesAutoConfiguration
通过@Bean注解往 IoC 容器中放入作业扫描类JobScanConfig对象实例 jobScanConfig。该类实现了 BeanFactoryPostProcessor 接口,先使用beanFactory.getBeansWithAnnotation()获取所有添加了自定义注解@Processor的类对象实例(即作业的对象实例),并将他们以作业名称为 key,作业对象实例为值存入属性 jobInfoMap 中,后续用来控制作业的执行。
1. ExecutorAutoConfiguration
- 将 properties 文件中的属性值映射到 ExecutorProperties 类对象实例中;
- 创建框架使用的线程池(使用 ThreadPoolExecutor);
- 创建执行器 Executor 类实例对象 executor,创建时会根据 properties 配置文件设置部门应用名称,zookeeper连接等信息,并通过注解执行 start 方法 @
@Bean(initMethod="start")- start 的方法执行,先连接 zookeeper;
- 注册到 zookeeper,创建一个节点用于控制器发现执行器,此节点和自己的 paas 容器 IP 一致;
- 使用 CommandListener 监听上述节点,用于接收控制器发出的命令;
- 若存此节点下存留之前控制器发给此执行器的指令,则先处理这些指令;
创建整体项目的启动类 DefaultLaucher 类实例对象 laucher(用于框架的调试,和各部门使用无关);
1.2.2 执行器执行指定作业
当执行器监听节点的子节点有变化时,会触发该节点监听器的
process(WatcherEvent event)方法:1. 先重新设置监听;
2. 遍历所有待执行的指令,解析指令并执行:
- 如果是停止指令,则从正在运行的作业中找到该作业并将状态置为停止
- 如果此时线程池满了,根据拒绝策略进行处理
如果是执行指令,则将指令提交到线程池,根据指令的内容调起相应的作业
作业类需要实现 Processor 接口或继承行内开发的部分抽象类;
- 添加上
@Processor用于被 jobScanConfig 扫描到; - 开发对应的 pojo (导出成文件或导入文件时使用)和 SQL
- 将作业执行时间提交到分布式批量平台,用于控制器进行定时调度(手工调度则通过分布式批量平台手工提交)
1.3 我进行的优化
1.3.1 针对千万级数据导出场景进行优化,编写了对应的数据导出作业程序
SQL 优化: ```sql — 正确的翻页 select * from tradelog where rowid in (select rowid
order by rowid;from (select t.rowidfrom tradelogwhere 条件 androwid > {#rowid}order by rowid)where rownum <= #{pageSize})
— 错误的翻页
select
from (select t.,
rownum as rn
from (select
from tradelog
order by rowid)
where rownum <= #{end})
where rn > start;
```
*作业多线程优化:
串行 + 传统 IO 的批量导出程序在上线后执行时间过长造成作业阻塞,因此我通过多线程 + 文件 NIO 的方式进行优化,减少了查库和磁盘 IO 的串行化,同时使用 FileChannel 提高了文件写入效率;
1.4 可能遇到的问题
- 如何体现分布式:
首先实现统一调度的控制器是多地部署的,并且由分布式批量平台保证其高可用。各部门应用接入分布式批量平台的执行器通过 PaaS 容器实现集群部署,虽然各部门应用的作业各不相同,但可以由控制器进行统一调度体现了分布式架构
- 作业调度是抢占式还是分配式
是通过控制器统一分配作业给执行器,控制器会根据执行器的反馈考虑是否将作业分配给此执行器,若分配的节点长时间未删除也未收到反馈指令,则将之后的作业分配给其他执行器;
2. 商户之家新公众号
完成 token 生成与校验的系统设计工作,以及前端加密后端解密的系统设计和编码工作:
- 通过 AOP 切面实现 token 的生成与校验,避免了已有接口的配合改造;token 存储采用了 Redis,由于存在 F5 的会话保持,因此采用了 Caffeine 作为降级措施;
- 通过前端将业务数据整体加密,后端使用 Filter 解密的方式,避免了已有的 SRF、XSS 等 Filter 和接口的 配合改造。针对 json 和 x-www-form-urlencoded 两种编码的 POST 请求,创建了两个 HttpServletRequestWra
pper 子类,以实现将解密后的明文数据重新放回请求体并保证可重复读取;
