目的:读csv,table/sql输出到csv
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(new FileSystem().path("C:\\Users\\Luo\\Desktop\\Table\\src\\main\\resources\\test.csv"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("inputTable");
Table table = tableEnv.from("inputTable").select("id,name").filter("id ===1");
Table table1 = tableEnv.from("inputTable").groupBy("id").select("id,id.count as count");
Table sql = tableEnv.sqlQuery("select id ,name from inputTable where id=2");
Table sql1 = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");
tableEnv.connect(new FileSystem().path("C:\\Users\\Luo\\Desktop\\Table\\src\\main\\resources\\out.csv"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("outTable");
sql.insertInto("outTable");
env.execute();
resouces 文件夹下产生out.csv文件
注意
初始是没有out.csv文件的。只能插入,不能撤回和更新插入,需要支持这些模式的场景