1. 应用启动
1.1 请求地址
http://{ip}:{port}/v1/jars/{project_id}/run
1.2 请求方式
POST
1.3 请求头
Content-Type: application/json;charset=UTF-8
1.4 请求参数示例
{
"entryClass" : "com.zxelec.fsdap.compute.etl.Application",
"programArgsList": [
"--appName",
"customer-app",
"--parallelism",
"1",
"--kafkaConsumer",
"{\"bootstrap\":\"192.168.1.249:9092\",\"topic\":\"imsi\",\"group\":\"f11\"}",
"--duplicate",
"{\"isDuplicate\":\"true\",\"keys\":\"field1,field2\",\"duplicateTime\":60}",
"--sink",
"[{\"bootstrap\":\"192.168.1.249:9092\",\"topic\":\"ff\",\"conditions\":[{\"key\":\"Method\",\"operator\":\"=\",\"value\":\"POST\"}]}]"
]
}
1.5 请求参数说明
参数名 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
entryClass | string | Y | 任务名 | |
programArgsList | array | Y | 任务参数 |
1.5.1 programArgsList参数说明
此参数为数组类型,数组每两项组成一对参数 key->value 结构。第一项为key,第二项为value。
参数名 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
—appName | string | N | ETLApp | 任务名称 |
—parallelism | string | N | 1 | 任务并行度 |
—kafkaConsumer | object | Y | 此项定义了数据源kafka信息。 bootstrap(必填): kafka brokers; topic(必填): kafka topic; group(必填): kafka group; |
|
—duplicate | object | Y | 此项定义了数据去重逻辑。 isDuplicate(必填): 是否去重true/false; keys(非必填): 过滤属性key,多属性则逗号拼接; duplicateTime(非必填): 去重间隔时间,数据类型为number,单位为秒,默认为60s; |
|
—sink | array | Y | 此项定义了数据过滤及输出逻辑,数组中每个json对象为一项过滤输出项。 bootstrap(必填): kafka brokers; topic(必填): kafka topic; conditions(非必填): 定义过滤逻辑。类型为数组类型,每项为一条过滤条件。key:字段属性,operator:过滤条件,value:值; |
1.6 响应结果示例
{
"jobid": "6d42ddb9c3597aa88357ef56edba7a95"
}
2. 应用取消
2.1 请求地址
http://{ip}:{port}/jobs/{jobid}
2.2 请求方式
PATCH
2.3 响应结果示例
{}
3. 应用详情查看
3.1 请求地址
http://{ip}:{port}/jobs/{jobid}
3.2 请求方式
GET
3.3 响应结果示例
{
"jid": "107abed300910f0450ce4b09973592a4",
"name": "ETLApp",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1652248802057,
"end-time": -1,
"duration": 1156064,
"maxParallelism": -1,
"now": 1652249958121,
"timestamps": {
"SUSPENDED": 0,
"CREATED": 1652248802120,
"RESTARTING": 0,
"FAILED": 0,
"RUNNING": 1652248802171,
"FAILING": 0,
"INITIALIZING": 1652248802057,
"RECONCILING": 0,
"CANCELED": 0,
"FINISHED": 0,
"CANCELLING": 0
},
"vertices": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"name": "Source: Custom Source -> Timestamps/Watermarks",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1652248802262,
"end-time": -1,
"duration": 1155859,
"tasks": {
"CANCELED": 0,
"RUNNING": 1,
"SCHEDULED": 0,
"CREATED": 0,
"RECONCILING": 0,
"INITIALIZING": 0,
"DEPLOYING": 0,
"FINISHED": 0,
"CANCELING": 0,
"FAILED": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 2088584254,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 2048046,
"write-records-complete": true
}
},
{
"id": "90bea66de1c231edf33913ecd54406c1",
"name": "KeyedProcess -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_4, type=*org.apache.flink.api.java.tuple.Tuple3<`f0` STRING, `f1` STRING, `f2` BIGINT NOT NULL>* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS kafka_key, f1 AS kafka_value, f2 AS kafka_timestamp], where=[(get_json_value(f1, _UTF-16LE'Method') = _UTF-16LE'POST':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]) -> SinkConversionToTuple3 -> Sink: Unnamed",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1652248802265,
"end-time": -1,
"duration": 1155856,
"tasks": {
"CANCELED": 0,
"RUNNING": 1,
"SCHEDULED": 0,
"CREATED": 0,
"RECONCILING": 0,
"INITIALIZING": 0,
"DEPLOYING": 0,
"FINISHED": 0,
"CANCELING": 0,
"FAILED": 0
},
"metrics": {
"read-bytes": 2088611006,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 2048046,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}
],
"status-counts": {
"CANCELED": 0,
"RUNNING": 2,
"SCHEDULED": 0,
"CREATED": 0,
"RECONCILING": 0,
"INITIALIZING": 0,
"DEPLOYING": 0,
"FINISHED": 0,
"CANCELING": 0,
"FAILED": 0
},
"plan": {
"jid": "107abed300910f0450ce4b09973592a4",
"name": "ETLApp",
"nodes": [
{
"id": "90bea66de1c231edf33913ecd54406c1",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "KeyedProcess -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_4, type=*org.apache.flink.api.java.tuple.Tuple3<`f0` STRING, `f1` STRING, `f2` BIGINT NOT NULL>* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS kafka_key, f1 AS kafka_value, f2 AS kafka_timestamp], where=[(get_json_value(f1, _UTF-16LE'Method') = _UTF-16LE'POST':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> SinkConversionToTuple3 -> Sink: Unnamed",
"inputs": [
{
"num": 0,
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"ship_strategy": "HASH",
"exchange": "pipelined_bounded"
}
],
"optimizer_properties": {}
},
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "Source: Custom Source -> Timestamps/Watermarks",
"optimizer_properties": {}
}
]
}
}