在实际应用中,可能遇到这样的场景:在Flink任务中用户想要在现有的Flink流式任务中,嵌入Alink模型进行预测,这就涉及到Flink与Alink的数据转换问题。本节将通过一个示例,展示各种转换操作。

Alink MLEnvironment

在本节的转换中,大家一定要使用Alink MLEnvironment,并通过它获取相应的StreamExecutionEnvironment和StreamTableEnvironment。代码如下:

  1. MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();
  2. StreamExecutionEnvironment env = mlEnv.getStreamExecutionEnvironment();
  3. StreamTableEnvironment tenv = mlEnv.getStreamTableEnvironment();

构造一个Flink DataStream数据

示例代码如下,使用Map类型构造数据。

  1. DataStreamSource<Map<String, Object>> inputDataStreamMap = env.addSource(
  2. new SourceFunction<Map <String, Object>>() {
  3. @Override
  4. public void run(SourceContext <Map <String, Object>> out) throws Exception {
  5. Map <String, Object> item = new HashMap<>();
  6. item.put("name", "a");
  7. item.put("val", 110);
  8. out.collect(item);
  9. Map <String, Object> item1 = new HashMap <>();
  10. item1.put("name", "b");
  11. item1.put("val", 111);
  12. out.collect(item1);
  13. Map <String, Object> item2 = new HashMap <>();
  14. item2.put("name", "c");
  15. item2.put("val", 113);
  16. out.collect(item2);
  17. }
  18. @Override
  19. public void cancel() {}
  20. });
  21. inputDataStreamMap.print();

输出信息如下:

  1. 10> {val=113, name=c}
  2. 8> {val=110, name=a}
  3. 9> {val=111, name=b}

Flink DataStream 转 Flink DataStream

使用Flink DataStream,T可以是任意泛型,但向Flink Table或Alink StreamOperator转换时,只能为Flink DataStream。其转换方法很直接,写个MapFunction就可以,示例代码如下所示:

  1. DataStream<Row> inputDataStreamRow = inputDataStreamMap.map(new MapFunction<Map <String, Object>, Row>() {
  2. @Override
  3. public Row map(Map <String, Object> value) throws Exception {
  4. return Row.of(value.get("name"), value.get("val"));
  5. }
  6. });
  7. inputDataStreamRow.print();

输出信息如下:

  1. 1> b,111
  2. 2> c,113
  3. 12> a,110

Flink DataStream 转 Flink Table

需要使用Alink提供的工具函数DataStreamConversionUtil.toTable(),各参数的意义比较明显,不再详细解释。具体代码如下:

  1. Table inputTable = DataStreamConversionUtil.toTable(mlEnv, inputDataStreamRow, new String[] {"name", "val"},
  2. new TypeInformation<?>[] {AlinkTypes.STRING, AlinkTypes.INT});
  3. inputTable.printSchema();

打印Schema信息如下:

  1. root
  2. |-- name: STRING
  3. |-- val: INT

Flink Table 转 Alink StreamOperator

使用组件TableSourceStreamOp,可以实现Flink Table 到 Alink StreamOperator的转换。代码如下:

  1. TableSourceStreamOp inputStreamOp = new TableSourceStreamOp(inputTable);

基于Alink StreamOperator,我们可以应用所有Alink算法组件,简单示例如下,对val列进行加1的操作,并增加一列,具体代码如下:

  1. StreamOperator<?> outputStreamOp = inputStreamOp
  2. .select("name, val + 1 AS val, 'output' AS type");
  3. outputStreamOp.print();

输出结果为:

  1. name|val|type
  2. ----|---|----
  3. a|111|output
  4. b|112|output
  5. c|114|output

Alink StreamOperator 转 Flink Table

Alink StreamOperator中有getOutputTable方法,可直接转换,代码如下:

  1. Table outputTable = outputStreamOp.getOutputTable();
  2. outputTable.printSchema();

打印Schema信息如下:

  1. root
  2. |-- name: STRING
  3. |-- val: INT
  4. |-- type: STRING