一、需求:
- 滑动窗口每20秒读取1分钟内数据,求平均值,最大值,最小值
二、实现:
pom文件 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <project xmlns=”http://maven.apache.org/POM/4.0.0“
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0 com.macro.flinkavg flinkavg 0.0.1
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.9.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.9.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.9.0</version></dependency><!--log4j2到slf4j桥梁--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.9.1</version></dependency><!--kfka客户端--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build>
2. kafka配置文件
bootstrap.servers=cdh1.macro.com:9092 acks=all retries=1 batch.size=16384 linger.ms=1 buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer topic=test
3. kafka生产者```javapackage com.macro.flinkavg.util;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.*;import java.util.Properties;public class SendToProducerUtill {public static void main(String[] args) throws IOException {Properties properties = new Properties();FileInputStream fileInputStream = new FileInputStream(new File("E:\\JavaWorkSpace\\flinkavg\\src\\main\\resources\\ProducerConf.properties"));properties.load(fileInputStream);// 创建生产者对象KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);// 发送数据// 创建流对象BufferedReader br = null;try {br = new BufferedReader(new FileReader("E:\\JavaWorkSpace\\flinkavg\\testData\\data"));//args[0] //"datas/userInfo.txt"// 定义字符串,保存读取的一行文字String line = null;// 循环读取,读取到最后返回nullwhile ((line = br.readLine()) != null) {System.out.println("成功发送消息:"+line);producer.send(new ProducerRecord<String, String>(properties.getProperty("topic"), line));}} catch (Exception e) {e.printStackTrace();} finally {// 释放资源if (br != null) {try {br.close();} catch (IOException e) {e.printStackTrace();}}}// 关闭资源producer.close();}}
- 实体类: ```java package com.macro.flinkavg.entity;
public class PowerInfo { private Long time; private String fanName; private Double value; private String year; private String month; private String day; private Integer counts;
public PowerInfo() { }public PowerInfo(Long time, String fanName, Double value, String year, String month, String day, Integer counts) {this.time = time;this.fanName = fanName;this.value = value;this.year = year;this.month = month;this.day = day;this.counts = counts;}public Long getTime() {return time;}public void setTime(Long time) {this.time = time;}public String getFanName() {return fanName;}public void setFanName(String fanName) {this.fanName = fanName;}public Double getValue() {return value;}public void setValue(Double value) {this.value = value;}public String getYear() {return year;}public void setYear(String year) {this.year = year;}public String getMonth() {return month;}public void setMonth(String month) {this.month = month;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}public Integer getCounts() {return counts;}public void setCounts(Integer counts) {this.counts = counts;}@Overridepublic String toString() {return "PowerInfo{" +"time=" + time +", fanName='" + fanName + '\'' +", value=" + value +", year='" + year + '\'' +", month='" + month + '\'' +", day='" + day + '\'' +", counts=" + counts +'}';}
}
5. 逻辑代码```javapackage com.macro.flinkavg.flink;import com.macro.flinkavg.entity.PowerInfo;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple4;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.text.ParsePosition;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Iterator;import java.util.Properties;public class VestoreDataAVGByFlink {public static void main(String[] args) throws Exception {// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 2. 读取kafka数据,创建 DataStreamProperties properties = new Properties();properties.setProperty("bootstrap.servers", "cdh1.macro.com:9092");properties.setProperty("group.id", "test_group");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("auto.offset.reset", "latest");DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));// 3. 转换为 PowerInfo,并分配时间戳和 watermarkDataStream<PowerInfo> dataStream = inputStream.map(line -> {String[] fields = line.split(",");Long timeSimple = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(fields[0], new ParsePosition(0)).getTime();return new PowerInfo(timeSimple, fields[1], new Double(fields[2]), fields[3], fields[4], fields[5], 1);}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<PowerInfo>() {@Overridepublic long extractAscendingTimestamp(PowerInfo timeStamp) {return timeStamp.getTime();}});// 4. 分组开窗聚合,得到每个窗口内value的最大值最小值和平均值dataStream.keyBy("counts").timeWindow(Time.minutes(1), Time.seconds(20)).apply( new VerstoreAggregate()).print();// 执行env.execute();}// 全窗口聚合public static class VerstoreAggregate implements WindowFunction<PowerInfo, Tuple4<Long, Double, Double, Double>, Tuple, TimeWindow> {ArrayList<Double> arrayList = new ArrayList<>();@Overridepublic void apply(Tuple tuple, TimeWindow timeWindow, Iterable<PowerInfo> iterable, Collector<Tuple4<Long, Double, Double, Double>> collector) throws Exception {Iterator<PowerInfo> iterator = iterable.iterator();while (iterator.hasNext()) {Double value = iterator.next().getValue();arrayList.add(value);}Double sum = 0.0;Double max = 0.0;Double min = 0.0;// 判断列表非空,以第一个元素作为最大值和最小值if ( ! arrayList.isEmpty()) {max = arrayList.get(0);min = arrayList.get(0);}// 循环遍历列表,取最大值最小值和和for (Double aDouble : arrayList) {//System.out.println("value:"+aDouble);sum += aDouble;if (max < aDouble) {max = aDouble;}if (min > aDouble) {min = aDouble;}}//System.out.println("count:"+arrayList.size());//System.out.println("{max:" + max + " min:" + min + " sum:" + sum + " endTime:" + timeWindow.getEnd()+"}");// 返回窗口内窗口结束时间,最大值,最小值,和平均值。collector.collect(new Tuple4<>(timeWindow.getEnd(),max,min,sum/arrayList.size()));arrayList.clear();}}}
三、结果展示
- kafka生产数据

- flink消费数据,展示结果

