分布式系统最基本的思维方式是去中心化把大的计算任务分而治之; 无论是数据库后期的扩展还是分布式任务调度扩展,都是通过集群去提高系统的算力。

elastic-job

官网:官网链接
elastic底层的任务调度还是使用的 quartz, 通过 zookeeper来动态给 job节点分片;
**

功能

a) 分布式:重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。

b) 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。

c) 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片

d) 集中管理:采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据 Zookeeper 的数据管理和监控 elastic-job。

e) failover 失效转移:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。

架构

image.png

作业启动

应用服务器启动时,向 zk 注册服务

image.png

zk 中的结构如下:

image.png

config 存储 job 配置 json信息:

  1. {
  2. "jobName":"com.alibaba.fota.job.SimpleJobDemo",
  3. "jobClass":"com.alibaba.fota.job.SimpleJobDemo",
  4. "jobType":"SIMPLE",
  5. "cron":"0/1 * * * * ?",
  6. "shardingTotalCount":1,
  7. "shardingItemParameters":"0=A",
  8. "jobParameter":"parameter",
  9. "failover":false,
  10. "misfire":true,
  11. "description":"",
  12. "jobProperties":{
  13. "job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler",
  14. "executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"
  15. },
  16. "monitorExecution":true,
  17. "maxTimeDiffSeconds":-1,
  18. "monitorPort":-1,
  19. "jobShardingStrategyClass":"",
  20. "reconcileIntervalMinutes":10,
  21. "disabled":false,
  22. "overwrite":true
  23. }

作业执行

作业执行由 quarz 触发

image.png

分片逻辑(核心)

1. 什么时候需要分片

  1. 通过 elastic-job 控制台修改 job 配置信息时;
  2. 作业服务器上下线时;
  3. 作业第一次启动;

2. 源码分析

作业启动会注册所有监听器

  1. /**
  2. * 开启所有监听器.
  3. */
  4. public void startAllListeners() {
  5. electionListenerManager.start();
  6. shardingListenerManager.start();
  7. failoverListenerManager.start();
  8. monitorExecutionListenerManager.start();
  9. shutdownListenerManager.start();
  10. triggerListenerManager.start();
  11. rescheduleListenerManager.start();
  12. guaranteeListenerManager.start();
  13. jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
  14. }

ShardingListenerManager.start() 注册了下面两个监听器:
ShardingTotalCountChangedJobListener: 分片数修改
ListenServersChangedJobListener :服务器数量变化

  1. // 内存里的分片数与 zk config 节点里面的配置数量不一致
  2. class ShardingTotalCountChangedJobListener extends AbstractJobListener {
  3. @Override
  4. protected void dataChanged(final String path, final Type eventType,
  5. final String data) {
  6. if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance()
  7. .getCurrentShardingTotalCount(jobName)) {
  8. int newShardingTotalCount = LiteJobConfigurationGsonFactory
  9. .fromJson(data).getTypeConfig().getCoreConfig()
  10. .getShardingTotalCount();
  11. if (newShardingTotalCount != JobRegistry.getInstance()
  12. .getCurrentShardingTotalCount(jobName)) {
  13. // zk 上创建临时节点 n
  14. shardingService.setReshardingFlag();
  15. JobRegistry.getInstance().setCurrentShardingTotalCount
  16. (jobName, newShardingTotalCount);
  17. }
  18. }
  19. }
  20. }
  21. 当新的分片节点加入或原的分片实例宕机后,需要进行重新分片。
  22. ${namespace}/jobname/servers${namespace}/jobname/instances
  23. 路径下的节点数量是否发生变化,如果检测到发生变化,设置需要重新分片标识。
  24. class ListenServersChangedJobListener extends AbstractJobListener {
  25. @Override
  26. protected void dataChanged(final String path, final Type eventType,
  27. final String data) {
  28. if (!JobRegistry.getInstance().isShutdown(jobName)
  29. && (isInstanceChange(eventType, path) || isServerChange(path))) {
  30. shardingService.setReshardingFlag();
  31. }
  32. }
  33. private boolean isInstanceChange(final Type eventType, final String path) {
  34. return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
  35. }
  36. private boolean isServerChange(final String path) {
  37. return serverNode.isServerPath(path);
  38. }
  39. }
  40. /**
  41. * 设置需要重新分片的标记.
  42. */
  43. public void setReshardingFlag() {
  44. jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
  45. }

具体分片逻辑在定时任务调度每次调度时发生:
AbstractElasticJobExecutor.execute() 方法
ShardingContexts shardingContexts = jobFacade.getShardingContexts();

/**
 * 执行作业
 */
public final void execute() {
    try {
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobExceptionHandler.handleException(jobName, cause);
    }
    // 分片
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
 }

// 执行具体分片逻辑
public ShardingContexts getShardingContexts() {
    boolean isFailover = configService.load(true).isFailover();
    if (isFailover) {
        List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
        if (!failoverShardingItems.isEmpty()) {
            return executionContextService.getJobShardingContext(failoverShardingItems);
        }
    }
    shardingService.shardingIfNecessary();
    List<Integer> shardingItems = shardingService.getLocalShardingItems();
    if (isFailover) {
        shardingItems.removeAll(failoverService.getLocalTakeOffItems());
    }
    shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
    return executionContextService.getJobShardingContext(shardingItems);
}


public void shardingIfNecessary() {
    List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
    if (!isNeedSharding() || availableJobInstances.isEmpty()) {
        return;
    }
    if (!leaderService.isLeaderUntilBlock()) {
        blockUntilShardingCompleted();
        return;
    }
    waitingOtherShardingItemCompleted();
    LiteJobConfiguration liteJobConfig = configService.load(false);
    int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
    log.debug("Job '{}' sharding begin.", jobName);
    jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
    resetShardingInfo(shardingTotalCount);
    JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
    jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
    log.debug("Job '{}' sharding complete.", jobName);
}

3.分片流程图

作业启动,开启节点监听器,如果监听到节点变化,生成 分片临时节点;

分片监听.png

每次作业执行时,如果是主节点,且需要分片。主节点会分片,将分片信息持久化到zk;

sharding.png

参考:博客链接