GitHub

https://github.com/alibaba/DataX
https://github.com/alibaba/DataX/blob/master/introduction.md
https://github.com/alibaba/DataX/blob/master/userGuid.md

JDK安装

https://www.oracle.com/java/technologies/downloads/archive/
版本说明
Linux x86 RPM Package //适用于32bit的centos、rethat(linux)操作系统
Linux x64 RPM Package //适用于64bit的centos、rethat(linux)操作系统
Linux x86 Compressed Archive //适用于32bit的Linux操作系统
Linux x64 Compressed Archive //适用于64bit的Linux操作系统

简介

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
本示例测试使用的sqlserver将一部分表全量同步到mysql。

背景需求

将某个sql server数据库的某个表的数据以离线作业方式同步到另外一个my sql数据库的表中.

添加mysql 驱动

在对mysql数据库进行读写的数据同步时,需要考虑mysql的数据库版本不同时,要自己手工添加上对应的mysql驱动版本.添加方法如下,先下载mysql-connector-java-8.0.27.jar,找到以下的目录(/usr/local是我自己定义的目录),
将mysql-connector-java-8.0.27.jar拷贝到以下目录下,这样就支持对mysql 8.0.27版本的写操作;
驱动版本下载: MySQL Community Downloads

  1. /usr/local/datax/plugin/writer/mysqlwriter/libs/

再找到以下目录(/usr/local是我自己定义的目录),

  1. /usr/local/datax/plugin/reader/mysqlreader/libs/

将mysql-connector-java-8.0.27.jar拷贝到以下目录下,这样就支持对mysql 8.0.27版本的读操作; :::warning 注意:在win环境下,mysql-connector-java连接驱动,如果有多个版本同时存在,dataX会选择低版本的连接驱动使用,这时选择的的连接驱动不一定对应上你安装的MySql数据库版本,这时只能保留其中一个版本才能正常执行作业.但测试时,在Linux环境下时,如果存在多版本的连接驱动,程序又能正常执行,只要存在对应Mysql数据库版本的连接驱动就行. ::: 举例,我的mysql 是8.0.27版本,需要使用mysql-connector-java-8.0.27.jar,但如果同时存在着默认的mysql-connector-java-5.1.34.jar,执行作业就会报错
1665471628052.png
1665471756238.png

查看配置模板

  1. python3 /usr/local/datax/bin/datax.py -r sqlserverreader -w mysqlwriter
  1. {
  2. "job": {
  3. "content": [
  4. {
  5. "reader": {
  6. "name": "sqlserverreader",
  7. "parameter": {
  8. "connection": [
  9. {
  10. "jdbcUrl": [],
  11. "table": []
  12. }
  13. ],
  14. "password": "",
  15. "username": ""
  16. }
  17. },
  18. "writer": {
  19. "name": "mysqlwriter",
  20. "parameter": {
  21. "column": [],
  22. "connection": [
  23. {
  24. "jdbcUrl": "",
  25. "table": []
  26. }
  27. ],
  28. "password": "",
  29. "preSql": [],
  30. "session": [],
  31. "username": "",
  32. "writeMode": ""
  33. }
  34. }
  35. }
  36. ],
  37. "setting": {
  38. "speed": {
  39. "channel": ""
  40. }
  41. }
  42. }
  43. }

配置文件模板

  1. {
  2. "job": {
  3. "content": [
  4. {
  5. "reader": {
  6. "name": "mysqlreader", # 读取端
  7. "parameter": {
  8. "column": [], # 需要同步的列 (* 表示所有的列)
  9. "connection": [
  10. {
  11. "jdbcUrl": [], # 连接信息
  12. "table": [] # 连接表
  13. }
  14. ],
  15. "password": "", # 连接用户
  16. "username": "", # 连接密码
  17. "where": "" # 描述筛选条件
  18. }
  19. },
  20. "writer": {
  21. "name": "mysqlwriter", # 写入端
  22. "parameter": {
  23. "column": [], # 需要同步的列
  24. "connection": [
  25. {
  26. "jdbcUrl": "", # 连接信息
  27. "table": [] # 连接表
  28. }
  29. ],
  30. "password": "", # 连接密码
  31. "preSql": [], # 同步前. 要做的事
  32. "session": [],
  33. "username": "", # 连接用户
  34. "writeMode": "" # 操作类型
  35. }
  36. }
  37. }
  38. ],
  39. "setting": {
  40. "speed": {
  41. "channel": "" # 指定并发数
  42. }
  43. }
  44. }
  45. }

配置文件1

  1. cd /usr/local/datax/job/
  1. vim mssqltomysql-published.json
  1. {
  2. "job": {
  3. "content": [
  4. {
  5. "reader": {
  6. "name": "sqlserverreader",
  7. "parameter": {
  8. "column": [
  9. "Id",
  10. "Version",
  11. "Name",
  12. "Content",
  13. "Retries",
  14. "Added",
  15. "ExpiresAt",
  16. "StatusName"
  17. ],
  18. "connection": [
  19. {
  20. "jdbcUrl": [
  21. "jdbc:sqlserver://192.168.3.40:1433;DatabaseName=es_test"
  22. ],
  23. "table": [
  24. "cap_published"
  25. ]
  26. }
  27. ],
  28. "password": "longfuchu",
  29. "username": "sa"
  30. }
  31. },
  32. "writer": {
  33. "name": "mysqlwriter",
  34. "parameter": {
  35. "column": [
  36. "Id",
  37. "Version",
  38. "Name",
  39. "Content",
  40. "Retries",
  41. "Added",
  42. "ExpiresAt",
  43. "StatusName"
  44. ],
  45. "connection": [
  46. {
  47. "jdbcUrl": "jdbc:mysql://192.168.3.40:3306/es_test?connectTimeout=10000&characterEncoding=utf-8&useSSL=false",
  48. "table": [
  49. "cap_published"
  50. ]
  51. }
  52. ],
  53. "password": "123456",
  54. "preSql": [],
  55. "session": [],
  56. "username": "root",
  57. "writeMode": "insert"
  58. }
  59. }
  60. }
  61. ],
  62. "setting": {
  63. "speed": {
  64. "channel": "5"
  65. }
  66. }
  67. }
  68. }

验证

  1. python3 /usr/local/datax/bin/datax.py /usr/local/datax/job/mssqltomysql-published.json

如果出现以下错误提示

  1. 2022-10-10 12:56:04.693 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
  2. 2022-10-10 12:56:04.696 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
  3. 2022-10-10 12:56:04.697 [main] INFO JobContainer - DataX jobContainer starts job.
  4. 2022-10-10 12:56:04.699 [main] INFO JobContainer - Set jobId = 0
  5. 2022-10-10 12:56:05.311 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:sqlserver://192.168.3.40:1433;DatabaseName=es_test.
  6. 2022-10-10 12:56:05.484 [job-0] INFO OriginalConfPretreatmentUtil - table:[cap_published] has columns:[Id,Version,Name,Content,Retries,Added,ExpiresAt,StatusName].
  7. 2022-10-10 12:56:05.724 [job-0] ERROR RetryUtil - Exception when calling callable, 异常Msg:Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IPPort或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.
  8. com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IPPort或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.
  9. at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26) ~[datax-common-0.0.1-SNAPSHOT.jar:na]
  10. at com.alibaba.datax.plugin.rdbms.util.RdbmsException.asConnException(RdbmsException.java:23) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  11. at com.alibaba.datax.plugin.rdbms.util.DBUtil.connect(DBUtil.java:394) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  12. at com.alibaba.datax.plugin.rdbms.util.DBUtil.connect(DBUtil.java:384) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  13. at com.alibaba.datax.plugin.rdbms.util.DBUtil.access$000(DBUtil.java:22) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  14. at com.alibaba.datax.plugin.rdbms.util.DBUtil$3.call(DBUtil.java:322) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  15. at com.alibaba.datax.plugin.rdbms.util.DBUtil$3.call(DBUtil.java:319) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  16. at com.alibaba.datax.common.util.RetryUtil$Retry.call(RetryUtil.java:164) ~[datax-common-0.0.1-SNAPSHOT.jar:na]
  17. at com.alibaba.datax.common.util.RetryUtil$Retry.doRetry(RetryUtil.java:111) ~[datax-common-0.0.1-SNAPSHOT.jar:na]
  18. at com.alibaba.datax.common.util.RetryUtil.executeWithRetry(RetryUtil.java:30) [datax-common-0.0.1-SNAPSHOT.jar:na]
  19. at com.alibaba.datax.plugin.rdbms.util.DBUtil.getConnection(DBUtil.java:319) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  20. at com.alibaba.datax.plugin.rdbms.util.DBUtil.getConnection(DBUtil.java:303) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  21. at com.alibaba.datax.plugin.rdbms.util.JdbcConnectionFactory.getConnecttion(JdbcConnectionFactory.java:27) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  22. at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:105) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  23. at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:140) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  24. at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.doPretreatment(OriginalConfPretreatmentUtil.java:35) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  25. at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Job.init(CommonRdbmsWriter.java:41) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
  26. at com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter$Job.init(MysqlWriter.java:31) [mysqlwriter-0.0.1-SNAPSHOT.jar:na]
  27. at com.alibaba.datax.core.job.JobContainer.initJobWriter(JobContainer.java:704) [datax-core-0.0.1-SNAPSHOT.jar:na]
  28. at com.alibaba.datax.core.job.JobContainer.init(JobContainer.java:304) [datax-core-0.0.1-SNAPSHOT.jar:na]
  29. at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:113) [datax-core-0.0.1-SNAPSHOT.jar:na]
  30. at com.alibaba.datax.core.Engine.start(Engine.java:92) [datax-core-0.0.1-SNAPSHOT.jar:na]
  31. at com.alibaba.datax.core.Engine.entry(Engine.java:171) [datax-core-0.0.1-SNAPSHOT.jar:na]
  32. at com.alibaba.datax.core.Engine.main(Engine.java:204) [datax-core-0.0.1-SNAPSHOT.jar:na]
  33. 2022-10-10 12:56:06.727 [job-0] ERROR RetryUtil - Exception when calling callable, 即将尝试执行第1次重试.本次重试计划等待[1000]ms,实际等待[1001]ms, 异常Msg:[Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IPPort或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.]
  34. 2022-10-10 12:56:08.733 [job-0] ERROR RetryUtil - Exception when calling callable, 即将尝试执行第2次重试.本次重试计划等待[2000]ms,实际等待[2000]ms, 异常Msg:[Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IPPort或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server.]
  35. ^CTraceback (most recent call last):
  36. File "/usr/local/datax/bin/datax.py", line 225, in <module>
  37. (stdout, stderr) = child_process.communicate()
  38. File "/usr/lib/python3.10/subprocess.py", line 1144, in communicate
  39. self.wait()
  40. File "/usr/lib/python3.10/subprocess.py", line 1207, in wait
  41. return self._wait(timeout=timeout)
  42. File "/usr/lib/python3.10/subprocess.py", line 1941, in _wait
  43. (pid, sts) = self._try_wait(0)
  44. File "/usr/lib/python3.10/subprocess.py", line 1899, in _try_wait
  45. (pid, sts) = os.waitpid(self.pid, wait_flags)
  46. File "/usr/local/datax/bin/datax.py", line 55, in suicide
  47. print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)
  48. TypeError: unsupported operand type(s) for >>: 'builtin_function_or_method' and '_io.TextIOWrapper'. Did you mean "print(<message>, file=<output_stream>)"?

除了检查数据库的各项配置外,还需要注意是否因为是数据库的版本问题,如我的mysql数据库版本是8.0.27,但dataX默认的驱动版本是mysql-connector-java-5.1.34.jar,这里就需要把8.0.27版本的驱动程序也要添加到dataX里面
执行结果
1665378494286.png

配置文件2

  1. {
  2. "job": {
  3. "content": [
  4. {
  5. "reader": {
  6. "name": "sqlserverreader",
  7. "parameter": {
  8. "column": [
  9. "id",
  10. "deptno",
  11. "deptname",
  12. "bloodno",
  13. "bloodname",
  14. "boutcount",
  15. "bloodunitname",
  16. "bodate"
  17. ],
  18. "connection": [
  19. {
  20. "jdbcUrl": [
  21. "jdbc:sqlserver://192.168.3.40:1433;DatabaseName=es_test"
  22. ],
  23. "table": [
  24. "V_Blood_BOutItem"
  25. ]
  26. }
  27. ],
  28. "password": "longfuchu",
  29. "username": "sa"
  30. }
  31. },
  32. "writer": {
  33. "name": "mysqlwriter",
  34. "parameter": {
  35. "column": [
  36. "id",
  37. "deptno",
  38. "deptname",
  39. "bloodno",
  40. "bloodname",
  41. "boutcount",
  42. "bloodunitname",
  43. "bodate"
  44. ],
  45. "connection": [
  46. {
  47. "jdbcUrl": "jdbc:mysql://192.168.3.40:3306/es_test?connectTimeout=10000&characterEncoding=utf-8&useSSL=false",
  48. "table": [
  49. "V_Blood_BOutItem"
  50. ]
  51. }
  52. ],
  53. "password": "123456",
  54. "preSql": [],
  55. "session": [],
  56. "username": "root",
  57. "writeMode": "insert"
  58. }
  59. }
  60. }
  61. ],
  62. "setting": {
  63. "speed": {
  64. "channel": "5"
  65. }
  66. }
  67. }
  68. }

验证

  1. python3 /usr/local/datax/bin/datax.py /usr/local/datax/job/mssqltomysql-outitems.json