一、需求:

    1. 滑动窗口每20秒读取1分钟内数据,求平均值,最大值,最小值

    二、实现:

    1. pom文件 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <project xmlns=”http://maven.apache.org/POM/4.0.0

      1. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      4.0.0

      com.macro.flinkavg flinkavg

      0.0.1
    1. <dependencies>
    2. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
    3. <dependency>
    4. <groupId>org.apache.flink</groupId>
    5. <artifactId>flink-connector-kafka_2.12</artifactId>
    6. <version>1.9.0</version>
    7. </dependency>
    8. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    9. <dependency>
    10. <groupId>org.apache.flink</groupId>
    11. <artifactId>flink-streaming-java_2.12</artifactId>
    12. <version>1.9.0</version>
    13. </dependency>
    14. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
    15. <dependency>
    16. <groupId>org.apache.flink</groupId>
    17. <artifactId>flink-java</artifactId>
    18. <version>1.9.0</version>
    19. </dependency>
    20. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    21. <dependency>
    22. <groupId>org.apache.flink</groupId>
    23. <artifactId>flink-clients_2.12</artifactId>
    24. <version>1.9.0</version>
    25. </dependency>
    26. <!--log4j2到slf4j桥梁-->
    27. <dependency>
    28. <groupId>org.apache.logging.log4j</groupId>
    29. <artifactId>log4j-slf4j-impl</artifactId>
    30. <version>2.9.1</version>
    31. </dependency>
    32. <!--kfka客户端-->
    33. <dependency>
    34. <groupId>org.apache.kafka</groupId>
    35. <artifactId>kafka-clients</artifactId>
    36. <version>2.2.1</version>
    37. </dependency>
    38. </dependencies>
    39. <build>
    40. <plugins>
    41. <plugin>
    42. <artifactId>maven-compiler-plugin</artifactId>
    43. <configuration>
    44. <source>1.8</source>
    45. <target>1.8</target>
    46. <encoding>UTF-8</encoding>
    47. </configuration>
    48. </plugin>
    49. </plugins>
    50. </build>

    1. 2. kafka配置文件

    bootstrap.servers=cdh1.macro.com:9092 acks=all retries=1 batch.size=16384 linger.ms=1 buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer topic=test

    1. 3. kafka生产者
    2. ```java
    3. package com.macro.flinkavg.util;
    4. import org.apache.kafka.clients.producer.KafkaProducer;
    5. import org.apache.kafka.clients.producer.ProducerRecord;
    6. import java.io.*;
    7. import java.util.Properties;
    8. public class SendToProducerUtill {
    9. public static void main(String[] args) throws IOException {
    10. Properties properties = new Properties();
    11. FileInputStream fileInputStream = new FileInputStream(new File("E:\\JavaWorkSpace\\flinkavg\\src\\main\\resources\\ProducerConf.properties"));
    12. properties.load(fileInputStream);
    13. // 创建生产者对象
    14. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    15. // 发送数据
    16. // 创建流对象
    17. BufferedReader br = null;
    18. try {
    19. br = new BufferedReader(new FileReader("E:\\JavaWorkSpace\\flinkavg\\testData\\data"));//args[0] //"datas/userInfo.txt"
    20. // 定义字符串,保存读取的一行文字
    21. String line = null;
    22. // 循环读取,读取到最后返回null
    23. while ((line = br.readLine()) != null) {
    24. System.out.println("成功发送消息:"+line);
    25. producer.send(new ProducerRecord<String, String>(properties.getProperty("topic"), line));
    26. }
    27. } catch (Exception e) {
    28. e.printStackTrace();
    29. } finally {
    30. // 释放资源
    31. if (br != null) {
    32. try {
    33. br.close();
    34. } catch (IOException e) {
    35. e.printStackTrace();
    36. }
    37. }
    38. }
    39. // 关闭资源
    40. producer.close();
    41. }
    42. }
    1. 实体类: ```java package com.macro.flinkavg.entity;

    public class PowerInfo { private Long time; private String fanName; private Double value; private String year; private String month; private String day; private Integer counts;

    1. public PowerInfo() { }
    2. public PowerInfo(Long time, String fanName, Double value, String year, String month, String day, Integer counts) {
    3. this.time = time;
    4. this.fanName = fanName;
    5. this.value = value;
    6. this.year = year;
    7. this.month = month;
    8. this.day = day;
    9. this.counts = counts;
    10. }
    11. public Long getTime() {
    12. return time;
    13. }
    14. public void setTime(Long time) {
    15. this.time = time;
    16. }
    17. public String getFanName() {
    18. return fanName;
    19. }
    20. public void setFanName(String fanName) {
    21. this.fanName = fanName;
    22. }
    23. public Double getValue() {
    24. return value;
    25. }
    26. public void setValue(Double value) {
    27. this.value = value;
    28. }
    29. public String getYear() {
    30. return year;
    31. }
    32. public void setYear(String year) {
    33. this.year = year;
    34. }
    35. public String getMonth() {
    36. return month;
    37. }
    38. public void setMonth(String month) {
    39. this.month = month;
    40. }
    41. public String getDay() {
    42. return day;
    43. }
    44. public void setDay(String day) {
    45. this.day = day;
    46. }
    47. public Integer getCounts() {
    48. return counts;
    49. }
    50. public void setCounts(Integer counts) {
    51. this.counts = counts;
    52. }
    53. @Override
    54. public String toString() {
    55. return "PowerInfo{" +
    56. "time=" + time +
    57. ", fanName='" + fanName + '\'' +
    58. ", value=" + value +
    59. ", year='" + year + '\'' +
    60. ", month='" + month + '\'' +
    61. ", day='" + day + '\'' +
    62. ", counts=" + counts +
    63. '}';
    64. }

    }

    1. 5. 逻辑代码
    2. ```java
    3. package com.macro.flinkavg.flink;
    4. import com.macro.flinkavg.entity.PowerInfo;
    5. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    6. import org.apache.flink.api.java.tuple.Tuple;
    7. import org.apache.flink.api.java.tuple.Tuple4;
    8. import org.apache.flink.streaming.api.TimeCharacteristic;
    9. import org.apache.flink.streaming.api.datastream.DataStream;
    10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    11. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    12. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    13. import org.apache.flink.streaming.api.windowing.time.Time;
    14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    15. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    16. import org.apache.flink.util.Collector;
    17. import java.text.ParsePosition;
    18. import java.text.SimpleDateFormat;
    19. import java.util.ArrayList;
    20. import java.util.Iterator;
    21. import java.util.Properties;
    22. public class VestoreDataAVGByFlink {
    23. public static void main(String[] args) throws Exception {
    24. // 1. 创建环境
    25. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    26. env.setParallelism(1);
    27. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    28. // 2. 读取kafka数据,创建 DataStream
    29. Properties properties = new Properties();
    30. properties.setProperty("bootstrap.servers", "cdh1.macro.com:9092");
    31. properties.setProperty("group.id", "test_group");
    32. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    33. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    34. properties.setProperty("auto.offset.reset", "latest");
    35. DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
    36. // 3. 转换为 PowerInfo,并分配时间戳和 watermark
    37. DataStream<PowerInfo> dataStream = inputStream
    38. .map(line -> {
    39. String[] fields = line.split(",");
    40. Long timeSimple = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(fields[0], new ParsePosition(0)).getTime();
    41. return new PowerInfo(timeSimple, fields[1], new Double(fields[2]), fields[3], fields[4], fields[5], 1);
    42. })
    43. .assignTimestampsAndWatermarks(
    44. new AscendingTimestampExtractor<PowerInfo>() {
    45. @Override
    46. public long extractAscendingTimestamp(PowerInfo timeStamp) {
    47. return timeStamp.getTime();
    48. }
    49. });
    50. // 4. 分组开窗聚合,得到每个窗口内value的最大值最小值和平均值
    51. dataStream.keyBy("counts")
    52. .timeWindow(Time.minutes(1), Time.seconds(20))
    53. .apply( new VerstoreAggregate()).print();
    54. // 执行
    55. env.execute();
    56. }
    57. // 全窗口聚合
    58. public static class VerstoreAggregate implements WindowFunction<PowerInfo, Tuple4<Long, Double, Double, Double>, Tuple, TimeWindow> {
    59. ArrayList<Double> arrayList = new ArrayList<>();
    60. @Override
    61. public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<PowerInfo> iterable, Collector<Tuple4<Long, Double, Double, Double>> collector) throws Exception {
    62. Iterator<PowerInfo> iterator = iterable.iterator();
    63. while (iterator.hasNext()) {
    64. Double value = iterator.next().getValue();
    65. arrayList.add(value);
    66. }
    67. Double sum = 0.0;
    68. Double max = 0.0;
    69. Double min = 0.0;
    70. // 判断列表非空,以第一个元素作为最大值和最小值
    71. if ( ! arrayList.isEmpty()) {
    72. max = arrayList.get(0);
    73. min = arrayList.get(0);
    74. }
    75. // 循环遍历列表,取最大值最小值和和
    76. for (Double aDouble : arrayList) {
    77. //System.out.println("value:"+aDouble);
    78. sum += aDouble;
    79. if (max < aDouble) {
    80. max = aDouble;
    81. }
    82. if (min > aDouble) {
    83. min = aDouble;
    84. }
    85. }
    86. //System.out.println("count:"+arrayList.size());
    87. //System.out.println("{max:" + max + " min:" + min + " sum:" + sum + " endTime:" + timeWindow.getEnd()+"}");
    88. // 返回窗口内窗口结束时间,最大值,最小值,和平均值。
    89. collector.collect(new Tuple4<>(timeWindow.getEnd(),max,min,sum/arrayList.size()));
    90. arrayList.clear();
    91. }
    92. }
    93. }

    三、结果展示

    1. kafka生产数据

    image.png

    1. flink消费数据,展示结果

    33ad04bbf304b290514422e41250b83.png