一、需求:
- 滑动窗口每20秒读取1分钟内数据,求平均值,最大值,最小值
二、实现:
pom文件 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <project xmlns=”http://maven.apache.org/POM/4.0.0“
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0 com.macro.flinkavg flinkavg 0.0.1
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.9.0</version>
</dependency>
<!--log4j2到slf4j桥梁-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.9.1</version>
</dependency>
<!--kfka客户端-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
2. kafka配置文件
bootstrap.servers=cdh1.macro.com:9092 acks=all retries=1 batch.size=16384 linger.ms=1 buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer topic=test
3. kafka生产者
```java
package com.macro.flinkavg.util;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.Properties;
public class SendToProducerUtill {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
FileInputStream fileInputStream = new FileInputStream(new File("E:\\JavaWorkSpace\\flinkavg\\src\\main\\resources\\ProducerConf.properties"));
properties.load(fileInputStream);
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 发送数据
// 创建流对象
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader("E:\\JavaWorkSpace\\flinkavg\\testData\\data"));//args[0] //"datas/userInfo.txt"
// 定义字符串,保存读取的一行文字
String line = null;
// 循环读取,读取到最后返回null
while ((line = br.readLine()) != null) {
System.out.println("成功发送消息:"+line);
producer.send(new ProducerRecord<String, String>(properties.getProperty("topic"), line));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放资源
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 关闭资源
producer.close();
}
}
- 实体类: ```java package com.macro.flinkavg.entity;
public class PowerInfo { private Long time; private String fanName; private Double value; private String year; private String month; private String day; private Integer counts;
public PowerInfo() { }
public PowerInfo(Long time, String fanName, Double value, String year, String month, String day, Integer counts) {
this.time = time;
this.fanName = fanName;
this.value = value;
this.year = year;
this.month = month;
this.day = day;
this.counts = counts;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public String getFanName() {
return fanName;
}
public void setFanName(String fanName) {
this.fanName = fanName;
}
public Double getValue() {
return value;
}
public void setValue(Double value) {
this.value = value;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public String getMonth() {
return month;
}
public void setMonth(String month) {
this.month = month;
}
public String getDay() {
return day;
}
public void setDay(String day) {
this.day = day;
}
public Integer getCounts() {
return counts;
}
public void setCounts(Integer counts) {
this.counts = counts;
}
@Override
public String toString() {
return "PowerInfo{" +
"time=" + time +
", fanName='" + fanName + '\'' +
", value=" + value +
", year='" + year + '\'' +
", month='" + month + '\'' +
", day='" + day + '\'' +
", counts=" + counts +
'}';
}
}
5. 逻辑代码
```java
package com.macro.flinkavg.flink;
import com.macro.flinkavg.entity.PowerInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Properties;
public class VestoreDataAVGByFlink {
public static void main(String[] args) throws Exception {
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 读取kafka数据,创建 DataStream
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "cdh1.macro.com:9092");
properties.setProperty("group.id", "test_group");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("auto.offset.reset", "latest");
DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
// 3. 转换为 PowerInfo,并分配时间戳和 watermark
DataStream<PowerInfo> dataStream = inputStream
.map(line -> {
String[] fields = line.split(",");
Long timeSimple = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(fields[0], new ParsePosition(0)).getTime();
return new PowerInfo(timeSimple, fields[1], new Double(fields[2]), fields[3], fields[4], fields[5], 1);
})
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<PowerInfo>() {
@Override
public long extractAscendingTimestamp(PowerInfo timeStamp) {
return timeStamp.getTime();
}
});
// 4. 分组开窗聚合,得到每个窗口内value的最大值最小值和平均值
dataStream.keyBy("counts")
.timeWindow(Time.minutes(1), Time.seconds(20))
.apply( new VerstoreAggregate()).print();
// 执行
env.execute();
}
// 全窗口聚合
public static class VerstoreAggregate implements WindowFunction<PowerInfo, Tuple4<Long, Double, Double, Double>, Tuple, TimeWindow> {
ArrayList<Double> arrayList = new ArrayList<>();
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<PowerInfo> iterable, Collector<Tuple4<Long, Double, Double, Double>> collector) throws Exception {
Iterator<PowerInfo> iterator = iterable.iterator();
while (iterator.hasNext()) {
Double value = iterator.next().getValue();
arrayList.add(value);
}
Double sum = 0.0;
Double max = 0.0;
Double min = 0.0;
// 判断列表非空,以第一个元素作为最大值和最小值
if ( ! arrayList.isEmpty()) {
max = arrayList.get(0);
min = arrayList.get(0);
}
// 循环遍历列表,取最大值最小值和和
for (Double aDouble : arrayList) {
//System.out.println("value:"+aDouble);
sum += aDouble;
if (max < aDouble) {
max = aDouble;
}
if (min > aDouble) {
min = aDouble;
}
}
//System.out.println("count:"+arrayList.size());
//System.out.println("{max:" + max + " min:" + min + " sum:" + sum + " endTime:" + timeWindow.getEnd()+"}");
// 返回窗口内窗口结束时间,最大值,最小值,和平均值。
collector.collect(new Tuple4<>(timeWindow.getEnd(),max,min,sum/arrayList.size()));
arrayList.clear();
}
}
}
三、结果展示
- kafka生产数据
- flink消费数据,展示结果