目的:读csv,table/sql输出到csv
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path("C:\\Users\\Luo\\Desktop\\Table\\src\\main\\resources\\test.csv")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.INT()).field("name", DataTypes.STRING())).createTemporaryTable("inputTable");Table table = tableEnv.from("inputTable").select("id,name").filter("id ===1");Table table1 = tableEnv.from("inputTable").groupBy("id").select("id,id.count as count");Table sql = tableEnv.sqlQuery("select id ,name from inputTable where id=2");Table sql1 = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");tableEnv.connect(new FileSystem().path("C:\\Users\\Luo\\Desktop\\Table\\src\\main\\resources\\out.csv")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.INT()).field("name", DataTypes.STRING())).createTemporaryTable("outTable");sql.insertInto("outTable");env.execute();
resouces 文件夹下产生out.csv文件
注意
初始是没有out.csv文件的。只能插入,不能撤回和更新插入,需要支持这些模式的场景
