启动job命令
python datax/bin/datax.py xx.json
记录一次通过参数执行的job
以下是配置文件
{
"job":{
"setting":{
"speed":{
"channel": 8,
"record":-1,
"byte":-1,
"batchSize":2048
}
},
"content":[{
"reader":{
"name":"mysqlreader",
"parameter":{
"username":"qaqcsync",
"password":"Jingye-00",
"column":["recid","orgid","projectid","type","blockid","blockname","floorcnt","unitscnt","images","$createUser","$createDept","now()","$createUser","now()","1","0","$projectid","0"],
"where":"projectid='$where'",
"connection":[{
"table":[
"basedata_blocklist"
],
"jdbcUrl":[
"jdbc:mysql://192.168.102.205:3306/aiis?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true"
]
}]
}
},
"writer":{
"name":"mysqlwriter",
"parameter":{
"writeMode":"insert",
"username":"qcdm",
"password":"Hzky@2020",
"column":["rec_id","org_code","project_code","type","block_code","block_name","floor_cnt","units_cnt","images","create_user","create_dept","create_time","update_user","update_time","status","is_deleted","project_id","sort"],
"preSql":[
"update qcdm_project_block set qcdm_project_block.is_deleted=1 where qcdm_project_block.project_code='$where' AND qcdm_project_block.is_deleted=0 AND qcdm_project_block.project_id='$projectid' AND qcdm_project_block.project_code='$where'"
],
"postSql":[
"update qcdm_project_block set qcdm_project_block.building_code=CONCAT(qcdm_project_block.project_code,'-',qcdm_project_block.block_code) where qcdm_project_block.is_deleted=0 and qcdm_project_block.project_code='$where' and qcdm_project_block.project_id='$projectid'",
"insert into qcdm_project_sycn_logs VALUES(0,now())"
],
"connection":[{
"jdbcUrl":"jdbc:mysql://192.168.102.208:3306/qcdm?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true",
"table":[
"qcdm_project_block"
]
}]
}
}
}]
}
}
pom依赖
<properties>
<!--Apache Commons Exec 版本号-->
<apache.version>1.3</apache.version>
</properties>
<dependencies>
<!-- commons-execCmd -->
<!-- Apache Commons Exec -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>${apache.version}</version>
</dependency>
</dependencies>
java调用程序
@SneakyThrows
@Override
public boolean doTask(String jsonPath, ProjectBlock projectBlock) {
int start = logsService.count();
File[] files = getFileLists(jsonPath);
for (File file : files
) {
CommandLine cmdLine = new CommandLine(paramService.getValue("PYTHON_PATH"));
cmdLine.addArgument(paramService.getValue("DATAX_PATH"));
cmdLine.addArgument("-p");
cmdLine.addArgument("-Dwhere=" + projectBlock.getProjectCode() + " " + "-Dprojectid=" + projectBlock.getProjectId().toString() + " " + "-DcreateUser=" + projectBlock.getCreateUser() + " " + "-DcreateDept=" + projectBlock.getCreateDept(), false);
// cmdLine.addArgument("-Dwhere=QP201608 -Dprojectid=1232257586504736770 -DcreateUser=1123598821738675201 -DcreateDept=1123598813738675201", false);
cmdLine.addArgument(file.getAbsolutePath());
DefaultExecutor executor = new DefaultExecutor();
//executor.setExitValue(1);
System.err.println("cmdLine:" + cmdLine.toString());
ExecuteWatchdog watchdog = new ExecuteWatchdog(600000);
executor.setWatchdog(watchdog);
int exitValue = executor.execute(cmdLine);
System.out.println("exitValue:" + exitValue);
}
int end = logsService.count();
return end == (start + files.length);
}
总结:
优化:
全局调优 (需要放在 job 上一个json模块前)
{
"core":{
"transport":{
"channel":{
"speed":{
"channel": 2,## 此处为数据导入的并发度,建议根据服务器硬件进行调优
"record":-1,##此处解除对读取行数的限制
"byte":-1,##此处解除对字节的限制
"batchSize":2048 ##每次读取batch的大小
}
}
}
}
局部调优 (最后一个模块)
"setting": {
"speed": {
"channel": 2,
"record":-1,
"byte":-1,
"batchSize":2048
}
}
}
}
jvm调优 (此处根据服务器配置进行调优,切记不可太大!否则直接Exception)
python datax.py --jvm="-Xms3G -Xmx3G" ../job/test.json
以上为调优,应该是可以针对每个json文件都可以进行调优
动态传参
如果需要导入数据的表太多而表的格式又相同,可以进行json文件的复用,举个简单的例子:
python datax.py -p "-Dsdbname=test -Dstable=test" ../job/test.json
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://XXXXXX:XX/${sdbname}?characterEncoding=utf-8",
"table": ["${stable}"]
}
],