启动job命令
python datax/bin/datax.py xx.json
记录一次通过参数执行的job
以下是配置文件
{"job":{"setting":{"speed":{"channel": 8,"record":-1,"byte":-1,"batchSize":2048}},"content":[{"reader":{"name":"mysqlreader","parameter":{"username":"qaqcsync","password":"Jingye-00","column":["recid","orgid","projectid","type","blockid","blockname","floorcnt","unitscnt","images","$createUser","$createDept","now()","$createUser","now()","1","0","$projectid","0"],"where":"projectid='$where'","connection":[{"table":["basedata_blocklist"],"jdbcUrl":["jdbc:mysql://192.168.102.205:3306/aiis?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true"]}]}},"writer":{"name":"mysqlwriter","parameter":{"writeMode":"insert","username":"qcdm","password":"Hzky@2020","column":["rec_id","org_code","project_code","type","block_code","block_name","floor_cnt","units_cnt","images","create_user","create_dept","create_time","update_user","update_time","status","is_deleted","project_id","sort"],"preSql":["update qcdm_project_block set qcdm_project_block.is_deleted=1 where qcdm_project_block.project_code='$where' AND qcdm_project_block.is_deleted=0 AND qcdm_project_block.project_id='$projectid' AND qcdm_project_block.project_code='$where'"],"postSql":["update qcdm_project_block set qcdm_project_block.building_code=CONCAT(qcdm_project_block.project_code,'-',qcdm_project_block.block_code) where qcdm_project_block.is_deleted=0 and qcdm_project_block.project_code='$where' and qcdm_project_block.project_id='$projectid'","insert into qcdm_project_sycn_logs VALUES(0,now())"],"connection":[{"jdbcUrl":"jdbc:mysql://192.168.102.208:3306/qcdm?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true","table":["qcdm_project_block"]}]}}}]}}
pom依赖
<properties><!--Apache Commons Exec 版本号--><apache.version>1.3</apache.version></properties><dependencies><!-- commons-execCmd --><!-- Apache Commons Exec --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-exec</artifactId><version>${apache.version}</version></dependency></dependencies>
java调用程序
@SneakyThrows@Overridepublic boolean doTask(String jsonPath, ProjectBlock projectBlock) {int start = logsService.count();File[] files = getFileLists(jsonPath);for (File file : files) {CommandLine cmdLine = new CommandLine(paramService.getValue("PYTHON_PATH"));cmdLine.addArgument(paramService.getValue("DATAX_PATH"));cmdLine.addArgument("-p");cmdLine.addArgument("-Dwhere=" + projectBlock.getProjectCode() + " " + "-Dprojectid=" + projectBlock.getProjectId().toString() + " " + "-DcreateUser=" + projectBlock.getCreateUser() + " " + "-DcreateDept=" + projectBlock.getCreateDept(), false);// cmdLine.addArgument("-Dwhere=QP201608 -Dprojectid=1232257586504736770 -DcreateUser=1123598821738675201 -DcreateDept=1123598813738675201", false);cmdLine.addArgument(file.getAbsolutePath());DefaultExecutor executor = new DefaultExecutor();//executor.setExitValue(1);System.err.println("cmdLine:" + cmdLine.toString());ExecuteWatchdog watchdog = new ExecuteWatchdog(600000);executor.setWatchdog(watchdog);int exitValue = executor.execute(cmdLine);System.out.println("exitValue:" + exitValue);}int end = logsService.count();return end == (start + files.length);}
总结:
优化:
全局调优 (需要放在 job 上一个json模块前)
{"core":{"transport":{"channel":{"speed":{"channel": 2,## 此处为数据导入的并发度,建议根据服务器硬件进行调优"record":-1,##此处解除对读取行数的限制"byte":-1,##此处解除对字节的限制"batchSize":2048 ##每次读取batch的大小}}}}
局部调优 (最后一个模块)
"setting": {"speed": {"channel": 2,"record":-1,"byte":-1,"batchSize":2048}}}}
jvm调优 (此处根据服务器配置进行调优,切记不可太大!否则直接Exception)
python datax.py --jvm="-Xms3G -Xmx3G" ../job/test.json
以上为调优,应该是可以针对每个json文件都可以进行调优
动态传参
如果需要导入数据的表太多而表的格式又相同,可以进行json文件的复用,举个简单的例子:
python datax.py -p "-Dsdbname=test -Dstable=test" ../job/test.json
"column": ["*"],"connection": [{"jdbcUrl": "jdbc:mysql://XXXXXX:XX/${sdbname}?characterEncoding=utf-8","table": ["${stable}"]}],
