DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

DataX 也是使用这种可插拔的设计,采用了 Framework + Plugin 的架构设计

DataX - 图1

DataX - 图2
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    Job执行

    首先,Job根据不同的分片策略,将任务分为多个任务组,每个组包含等量的子任务。然后提交给调度器,调度器启动后,执行读写逻辑线程。如下图示:
    image.png
    大体的交互流程如下示:
    image.png

    调用方式

    运行

    命令行调用:python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
    其中,’datax.py’ 是python编写的调用入口。

    本地debug

    Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑。
    com.alibaba.datax.core.Engine 的main方法。
    image.png

    核心类介绍

    JobContainer

    jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、post以及destroy和statistics。
    JobContainer的schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,同时不同的执行模式调用不同的调度策略,将所有任务调度起来。
    将调度执行逻辑委派给AbstractScheduler的schedule(taskGroupConfigs),方法中
    image.png

    AbstractScheduler

    AbstractScheduler的子类执行任务线程,使用固定大小线程池管理taskGroup。
    image.png

    TaskGroupContainerRunner

    TaskGroupContainerRunner的run方法,执行TaskGroupContainer的start方法。这样,一个任务组就开始在线程池中运行了。
    image.png

    TaskExecutor

    TaskExecutor是一个完整task的执行器,内部包含reader和writer线程。

    读写组件

    以mysql为例。读写组件类图如下示:
    image.png
    image.png
    reader和writer内部结构类似,核心内部类Job和Task。
    Job主要负责一系列的环境处理Task完成了读取或写入的实现逻辑
  1. {
  2. "job": {
  3. "content": [
  4. {
  5. "reader": {
  6. "name": "mysqlreader",
  7. "parameter": {
  8. "connection": [
  9. {
  10. "querySql": ["SELECT * FROM table1"],
  11. "jdbcUrl": ["jdbc:mysql://localhost:3306/datax?useSSL=false&useUnicode=true&characterEncoding=UTF-8"]
  12. }
  13. ],
  14. "password": "root",
  15. "username": "root"
  16. }
  17. },
  18. "writer": {
  19. "name": "mysqlwriter",
  20. "parameter": {
  21. "column": ["*"],
  22. "writeMode": "update",
  23. "preSql": [
  24. "delete from table2"
  25. ],
  26. "connection": [
  27. {
  28. "jdbcUrl": "jdbc:mysql://localhost:3306/datax?useSSL=false&useUnicode=true&characterEncoding=UTF-8",
  29. "table": ["table2"]
  30. }
  31. ],
  32. "password": "root",
  33. "username": "root"
  34. }
  35. }
  36. }
  37. ],
  38. "setting": {
  39. "speed": {
  40. "byte": 1048576,
  41. "channel": 5
  42. }
  43. }
  44. }
  45. }

源码

fork了datax的源码,后续进行源码分析,增加代码注释。DataX源码分析地址

源码解析