在实际应用中,可能遇到这样的场景:在Flink任务中用户想要在现有的Flink流式任务中,嵌入Alink模型进行预测,这就涉及到Flink与Alink的数据转换问题。本节将通过一个示例,展示各种转换操作。
Alink MLEnvironment
在本节的转换中,大家一定要使用Alink MLEnvironment,并通过它获取相应的StreamExecutionEnvironment和StreamTableEnvironment。代码如下:
MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();
StreamExecutionEnvironment env = mlEnv.getStreamExecutionEnvironment();
StreamTableEnvironment tenv = mlEnv.getStreamTableEnvironment();
构造一个Flink DataStream数据
示例代码如下,使用Map类型构造数据。
DataStreamSource<Map<String, Object>> inputDataStreamMap = env.addSource(
new SourceFunction<Map <String, Object>>() {
@Override
public void run(SourceContext <Map <String, Object>> out) throws Exception {
Map <String, Object> item = new HashMap<>();
item.put("name", "a");
item.put("val", 110);
out.collect(item);
Map <String, Object> item1 = new HashMap <>();
item1.put("name", "b");
item1.put("val", 111);
out.collect(item1);
Map <String, Object> item2 = new HashMap <>();
item2.put("name", "c");
item2.put("val", 113);
out.collect(item2);
}
@Override
public void cancel() {}
});
inputDataStreamMap.print();
输出信息如下:
10> {val=113, name=c}
8> {val=110, name=a}
9> {val=111, name=b}
Flink DataStream 转 Flink DataStream|
使用Flink DataStream
DataStream<Row> inputDataStreamRow = inputDataStreamMap.map(new MapFunction<Map <String, Object>, Row>() {
@Override
public Row map(Map <String, Object> value) throws Exception {
return Row.of(value.get("name"), value.get("val"));
}
});
inputDataStreamRow.print();
输出信息如下:
1> b,111
2> c,113
12> a,110
Flink DataStream 转 Flink Table
需要使用Alink提供的工具函数DataStreamConversionUtil.toTable(),各参数的意义比较明显,不再详细解释。具体代码如下:
Table inputTable = DataStreamConversionUtil.toTable(mlEnv, inputDataStreamRow, new String[] {"name", "val"},
new TypeInformation<?>[] {AlinkTypes.STRING, AlinkTypes.INT});
inputTable.printSchema();
打印Schema信息如下:
root
|-- name: STRING
|-- val: INT
Flink Table 转 Alink StreamOperator
使用组件TableSourceStreamOp,可以实现Flink Table 到 Alink StreamOperator的转换。代码如下:
TableSourceStreamOp inputStreamOp = new TableSourceStreamOp(inputTable);
基于Alink StreamOperator,我们可以应用所有Alink算法组件,简单示例如下,对val列进行加1的操作,并增加一列,具体代码如下:
StreamOperator<?> outputStreamOp = inputStreamOp
.select("name, val + 1 AS val, 'output' AS type");
outputStreamOp.print();
输出结果为:
name|val|type
----|---|----
a|111|output
b|112|output
c|114|output
Alink StreamOperator 转 Flink Table
Alink StreamOperator中有getOutputTable方法,可直接转换,代码如下:
Table outputTable = outputStreamOp.getOutputTable();
outputTable.printSchema();
打印Schema信息如下:
root
|-- name: STRING
|-- val: INT
|-- type: STRING