1. 应用启动

1.1 请求地址

http://{ip}:{port}/v1/jars/{project_id}/run

1.2 请求方式

POST

1.3 请求头

Content-Type: application/json;charset=UTF-8

1.4 请求参数示例

  1. {
  2. "entryClass" : "com.zxelec.fsdap.compute.etl.Application",
  3. "programArgsList": [
  4. "--appName",
  5. "customer-app",
  6. "--parallelism",
  7. "1",
  8. "--kafkaConsumer",
  9. "{\"bootstrap\":\"192.168.1.249:9092\",\"topic\":\"imsi\",\"group\":\"f11\"}",
  10. "--duplicate",
  11. "{\"isDuplicate\":\"true\",\"keys\":\"field1,field2\",\"duplicateTime\":60}",
  12. "--sink",
  13. "[{\"bootstrap\":\"192.168.1.249:9092\",\"topic\":\"ff\",\"conditions\":[{\"key\":\"Method\",\"operator\":\"=\",\"value\":\"POST\"}]}]"
  14. ]
  15. }

1.5 请求参数说明

参数名 类型 必填 默认值 说明
entryClass string Y 任务名
programArgsList array Y 任务参数
1.5.1 programArgsList参数说明

此参数为数组类型,数组每两项组成一对参数 key->value 结构。第一项为key,第二项为value。

参数名 类型 必填 默认值 说明
—appName string N ETLApp 任务名称
—parallelism string N 1 任务并行度
—kafkaConsumer object Y 此项定义了数据源kafka信息。
bootstrap(必填): kafka brokers;
topic(必填): kafka topic;
group(必填): kafka group;
—duplicate object Y 此项定义了数据去重逻辑。
isDuplicate(必填): 是否去重true/false;
keys(非必填): 过滤属性key,多属性则逗号拼接;
duplicateTime(非必填): 去重间隔时间,数据类型为number,单位为秒,默认为60s;
—sink array Y 此项定义了数据过滤及输出逻辑,数组中每个json对象为一项过滤输出项。
bootstrap(必填): kafka brokers;
topic(必填): kafka topic;
conditions(非必填): 定义过滤逻辑。类型为数组类型,每项为一条过滤条件。key:字段属性,operator:过滤条件,value:值;

1.6 响应结果示例

  1. {
  2. "jobid": "6d42ddb9c3597aa88357ef56edba7a95"
  3. }

2. 应用取消

2.1 请求地址

http://{ip}:{port}/jobs/{jobid}

2.2 请求方式

PATCH

2.3 响应结果示例

  1. {}

3. 应用详情查看

3.1 请求地址

http://{ip}:{port}/jobs/{jobid}

3.2 请求方式

GET

3.3 响应结果示例

  1. {
  2. "jid": "107abed300910f0450ce4b09973592a4",
  3. "name": "ETLApp",
  4. "isStoppable": false,
  5. "state": "RUNNING",
  6. "start-time": 1652248802057,
  7. "end-time": -1,
  8. "duration": 1156064,
  9. "maxParallelism": -1,
  10. "now": 1652249958121,
  11. "timestamps": {
  12. "SUSPENDED": 0,
  13. "CREATED": 1652248802120,
  14. "RESTARTING": 0,
  15. "FAILED": 0,
  16. "RUNNING": 1652248802171,
  17. "FAILING": 0,
  18. "INITIALIZING": 1652248802057,
  19. "RECONCILING": 0,
  20. "CANCELED": 0,
  21. "FINISHED": 0,
  22. "CANCELLING": 0
  23. },
  24. "vertices": [
  25. {
  26. "id": "cbc357ccb763df2852fee8c4fc7d55f2",
  27. "name": "Source: Custom Source -> Timestamps/Watermarks",
  28. "maxParallelism": 128,
  29. "parallelism": 1,
  30. "status": "RUNNING",
  31. "start-time": 1652248802262,
  32. "end-time": -1,
  33. "duration": 1155859,
  34. "tasks": {
  35. "CANCELED": 0,
  36. "RUNNING": 1,
  37. "SCHEDULED": 0,
  38. "CREATED": 0,
  39. "RECONCILING": 0,
  40. "INITIALIZING": 0,
  41. "DEPLOYING": 0,
  42. "FINISHED": 0,
  43. "CANCELING": 0,
  44. "FAILED": 0
  45. },
  46. "metrics": {
  47. "read-bytes": 0,
  48. "read-bytes-complete": true,
  49. "write-bytes": 2088584254,
  50. "write-bytes-complete": true,
  51. "read-records": 0,
  52. "read-records-complete": true,
  53. "write-records": 2048046,
  54. "write-records-complete": true
  55. }
  56. },
  57. {
  58. "id": "90bea66de1c231edf33913ecd54406c1",
  59. "name": "KeyedProcess -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_4, type=*org.apache.flink.api.java.tuple.Tuple3<`f0` STRING, `f1` STRING, `f2` BIGINT NOT NULL>* NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS kafka_key, f1 AS kafka_value, f2 AS kafka_timestamp], where=[(get_json_value(f1, _UTF-16LE'Method') = _UTF-16LE'POST':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]) -> SinkConversionToTuple3 -> Sink: Unnamed",
  60. "maxParallelism": 128,
  61. "parallelism": 1,
  62. "status": "RUNNING",
  63. "start-time": 1652248802265,
  64. "end-time": -1,
  65. "duration": 1155856,
  66. "tasks": {
  67. "CANCELED": 0,
  68. "RUNNING": 1,
  69. "SCHEDULED": 0,
  70. "CREATED": 0,
  71. "RECONCILING": 0,
  72. "INITIALIZING": 0,
  73. "DEPLOYING": 0,
  74. "FINISHED": 0,
  75. "CANCELING": 0,
  76. "FAILED": 0
  77. },
  78. "metrics": {
  79. "read-bytes": 2088611006,
  80. "read-bytes-complete": true,
  81. "write-bytes": 0,
  82. "write-bytes-complete": true,
  83. "read-records": 2048046,
  84. "read-records-complete": true,
  85. "write-records": 0,
  86. "write-records-complete": true
  87. }
  88. }
  89. ],
  90. "status-counts": {
  91. "CANCELED": 0,
  92. "RUNNING": 2,
  93. "SCHEDULED": 0,
  94. "CREATED": 0,
  95. "RECONCILING": 0,
  96. "INITIALIZING": 0,
  97. "DEPLOYING": 0,
  98. "FINISHED": 0,
  99. "CANCELING": 0,
  100. "FAILED": 0
  101. },
  102. "plan": {
  103. "jid": "107abed300910f0450ce4b09973592a4",
  104. "name": "ETLApp",
  105. "nodes": [
  106. {
  107. "id": "90bea66de1c231edf33913ecd54406c1",
  108. "parallelism": 1,
  109. "operator": "",
  110. "operator_strategy": "",
  111. "description": "KeyedProcess -&gt; DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_4, type=*org.apache.flink.api.java.tuple.Tuple3&lt;`f0` STRING, `f1` STRING, `f2` BIGINT NOT NULL&gt;* NOT NULL, rowtime=false, watermark=false) -&gt; Calc(select=[f0 AS kafka_key, f1 AS kafka_value, f2 AS kafka_timestamp], where=[(get_json_value(f1, _UTF-16LE'Method') = _UTF-16LE'POST':VARCHAR(2147483647) CHARACTER SET &quot;UTF-16LE&quot;)]) -&gt; SinkConversionToTuple3 -&gt; Sink: Unnamed",
  112. "inputs": [
  113. {
  114. "num": 0,
  115. "id": "cbc357ccb763df2852fee8c4fc7d55f2",
  116. "ship_strategy": "HASH",
  117. "exchange": "pipelined_bounded"
  118. }
  119. ],
  120. "optimizer_properties": {}
  121. },
  122. {
  123. "id": "cbc357ccb763df2852fee8c4fc7d55f2",
  124. "parallelism": 1,
  125. "operator": "",
  126. "operator_strategy": "",
  127. "description": "Source: Custom Source -&gt; Timestamps/Watermarks",
  128. "optimizer_properties": {}
  129. }
  130. ]
  131. }
  132. }