启动job命令

  1. python datax/bin/datax.py xx.json

记录一次通过参数执行的job

以下是配置文件

  1. {
  2. "job":{
  3. "setting":{
  4. "speed":{
  5. "channel": 8,
  6. "record":-1,
  7. "byte":-1,
  8. "batchSize":2048
  9. }
  10. },
  11. "content":[{
  12. "reader":{
  13. "name":"mysqlreader",
  14. "parameter":{
  15. "username":"qaqcsync",
  16. "password":"Jingye-00",
  17. "column":["recid","orgid","projectid","type","blockid","blockname","floorcnt","unitscnt","images","$createUser","$createDept","now()","$createUser","now()","1","0","$projectid","0"],
  18. "where":"projectid='$where'",
  19. "connection":[{
  20. "table":[
  21. "basedata_blocklist"
  22. ],
  23. "jdbcUrl":[
  24. "jdbc:mysql://192.168.102.205:3306/aiis?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true"
  25. ]
  26. }]
  27. }
  28. },
  29. "writer":{
  30. "name":"mysqlwriter",
  31. "parameter":{
  32. "writeMode":"insert",
  33. "username":"qcdm",
  34. "password":"Hzky@2020",
  35. "column":["rec_id","org_code","project_code","type","block_code","block_name","floor_cnt","units_cnt","images","create_user","create_dept","create_time","update_user","update_time","status","is_deleted","project_id","sort"],
  36. "preSql":[
  37. "update qcdm_project_block set qcdm_project_block.is_deleted=1 where qcdm_project_block.project_code='$where' AND qcdm_project_block.is_deleted=0 AND qcdm_project_block.project_id='$projectid' AND qcdm_project_block.project_code='$where'"
  38. ],
  39. "postSql":[
  40. "update qcdm_project_block set qcdm_project_block.building_code=CONCAT(qcdm_project_block.project_code,'-',qcdm_project_block.block_code) where qcdm_project_block.is_deleted=0 and qcdm_project_block.project_code='$where' and qcdm_project_block.project_id='$projectid'",
  41. "insert into qcdm_project_sycn_logs VALUES(0,now())"
  42. ],
  43. "connection":[{
  44. "jdbcUrl":"jdbc:mysql://192.168.102.208:3306/qcdm?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true",
  45. "table":[
  46. "qcdm_project_block"
  47. ]
  48. }]
  49. }
  50. }
  51. }]
  52. }
  53. }

pom依赖

  1. <properties>
  2. <!--Apache Commons Exec 版本号-->
  3. <apache.version>1.3</apache.version>
  4. </properties>
  5. <dependencies>
  6. <!-- commons-execCmd -->
  7. <!-- Apache Commons Exec -->
  8. <dependency>
  9. <groupId>org.apache.commons</groupId>
  10. <artifactId>commons-exec</artifactId>
  11. <version>${apache.version}</version>
  12. </dependency>
  13. </dependencies>

java调用程序

  1. @SneakyThrows
  2. @Override
  3. public boolean doTask(String jsonPath, ProjectBlock projectBlock) {
  4. int start = logsService.count();
  5. File[] files = getFileLists(jsonPath);
  6. for (File file : files
  7. ) {
  8. CommandLine cmdLine = new CommandLine(paramService.getValue("PYTHON_PATH"));
  9. cmdLine.addArgument(paramService.getValue("DATAX_PATH"));
  10. cmdLine.addArgument("-p");
  11. cmdLine.addArgument("-Dwhere=" + projectBlock.getProjectCode() + " " + "-Dprojectid=" + projectBlock.getProjectId().toString() + " " + "-DcreateUser=" + projectBlock.getCreateUser() + " " + "-DcreateDept=" + projectBlock.getCreateDept(), false);
  12. // cmdLine.addArgument("-Dwhere=QP201608 -Dprojectid=1232257586504736770 -DcreateUser=1123598821738675201 -DcreateDept=1123598813738675201", false);
  13. cmdLine.addArgument(file.getAbsolutePath());
  14. DefaultExecutor executor = new DefaultExecutor();
  15. //executor.setExitValue(1);
  16. System.err.println("cmdLine:" + cmdLine.toString());
  17. ExecuteWatchdog watchdog = new ExecuteWatchdog(600000);
  18. executor.setWatchdog(watchdog);
  19. int exitValue = executor.execute(cmdLine);
  20. System.out.println("exitValue:" + exitValue);
  21. }
  22. int end = logsService.count();
  23. return end == (start + files.length);
  24. }

总结:

优化:

全局调优 (需要放在 job 上一个json模块前)

  1. {
  2. "core":{
  3. "transport":{
  4. "channel":{
  5. "speed":{
  6. "channel": 2,## 此处为数据导入的并发度,建议根据服务器硬件进行调优
  7. "record":-1,##此处解除对读取行数的限制
  8. "byte":-1,##此处解除对字节的限制
  9. "batchSize":2048 ##每次读取batch的大小
  10. }
  11. }
  12. }
  13. }

局部调优 (最后一个模块)

  1. "setting": {
  2. "speed": {
  3. "channel": 2,
  4. "record":-1,
  5. "byte":-1,
  6. "batchSize":2048
  7. }
  8. }
  9. }
  10. }

jvm调优 (此处根据服务器配置进行调优,切记不可太大!否则直接Exception)

  1. python datax.py --jvm="-Xms3G -Xmx3G" ../job/test.json

以上为调优,应该是可以针对每个json文件都可以进行调优

动态传参

如果需要导入数据的表太多而表的格式又相同,可以进行json文件的复用,举个简单的例子:

  1. python datax.py -p "-Dsdbname=test -Dstable=test" ../job/test.json
  1. "column": ["*"],
  2. "connection": [
  3. {
  4. "jdbcUrl": "jdbc:mysql://XXXXXX:XX/${sdbname}?characterEncoding=utf-8",
  5. "table": ["${stable}"]
  6. }
  7. ],