1.简单讲解

对于这个简短的程序,逻辑还是非常简单的。

  1. 首先创建一个流处理环境env,
  2. 然后往这个环境添加数据源,比如env.socketTextStream,和env.addSource(kafkaSource);
  3. 自定义算子,算子的作用是对传输过来的每一条数据进行处理,是数据处理的核心部分。在下面的程序里,我们重写了flatMap()方法,flatMap+keyBy+sum就完成了一条消息的word count。
  4. env.execute()执行任务。实际上前面的部分是我们定义了这个任务的执行规则,到这一行才开始执行任务,按上述规则对每个事件(指收到消息)进行处理。

    2.maven依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.slf4j</groupId>
    4. <artifactId>slf4j-log4j12</artifactId>
    5. <version>1.7.7</version>
    6. <scope>runtime</scope>
    7. </dependency>
    8. <dependency>
    9. <groupId>log4j</groupId>
    10. <artifactId>log4j</artifactId>
    11. <version>1.2.17</version>
    12. <scope>runtime</scope>
    13. </dependency>
    14. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    15. <dependency>
    16. <groupId>org.apache.kafka</groupId>
    17. <artifactId>kafka_2.11</artifactId>
    18. <version>2.3.1</version>
    19. </dependency>
    20. <dependency>
    21. <groupId>org.apache.kafka</groupId>
    22. <artifactId>kafka-clients</artifactId>
    23. <version>2.3.1</version>
    24. </dependency>
    25. <!-- Apache Flink dependencies -->
    26. <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
    27. <dependency>
    28. <groupId>org.apache.flink</groupId>
    29. <artifactId>flink-java</artifactId>
    30. <version>1.10.2</version>
    31. <scope>provided</scope>
    32. </dependency>
    33. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    34. <dependency>
    35. <groupId>org.apache.flink</groupId>
    36. <artifactId>flink-streaming-java_2.11</artifactId>
    37. <version>1.10.2</version>
    38. <scope>provided</scope>
    39. </dependency>
    40. <dependency>
    41. <groupId>org.apache.flink</groupId>
    42. <artifactId>flink-connector-kafka-base_2.11</artifactId>
    43. <version>1.10.2</version>
    44. </dependency>
    45. <dependency>
    46. <groupId>org.apache.flink</groupId>
    47. <artifactId>flink-connector-kafka_2.11</artifactId>
    48. <version>1.10.2</version>
    49. </dependency>

    需要注意的是我们有两处provided,所以要在IDEA->Run->edit configurations里选上 Include provided scope那一行
    image.png

3.Java代码

flinkstream类,接收Kafka生产者发送的消息,对每条消息进行word count

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.time.Time;
  7. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  8. import org.apache.flink.util.Collector;
  9. import java.util.Properties;
  10. public class flinkstream {
  11. public static void main(String[] args) throws Exception {
  12. //创建流处理环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. //接收一个socket文本流,参数:主机名,端口号
  15. // DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
  16. System.out.println("设置kafka连接参数!!!");
  17. Properties props = new Properties();
  18. props.setProperty("bootstrap.servers", "172.27.210.57:9092");
  19. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  20. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  21. props.put("group.id", "cug/kafka/test");
  22. //如果没有记录偏移量,第一次从最开始消费
  23. //props.setProperty("auto.offset.reset", "earliest");
  24. //kafka的消费者不自动提交偏移量
  25. // props.setProperty("enable.auto.commit","false");
  26. System.out.println("给flink添加上kafka数据源!!!");
  27. //kafkaSource,flink消费指定topic的数据
  28. FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("lt", new SimpleStringSchema(), props);
  29. DataStream<String> lines = env.addSource(kafkaSource);
  30. //进行转换处理
  31. DataStream<Tuple2<String, Integer>> dataStream = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  32. @Override
  33. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  34. String[] tokens = s.toLowerCase().split("\\W+");
  35. for (String token : tokens) {
  36. if (token.length() > 0) {
  37. collector.collect(new Tuple2<String, Integer>(token, 1));
  38. }
  39. }
  40. }
  41. }).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
  42. //keyby是分组 ,sum叠加
  43. //打印结果
  44. dataStream.print();
  45. //启动任务执行,execute可以不给参数,参数是作业名字
  46. env.execute("Java WordCount from SocketTextStream Example");
  47. }
  48. }

TestProducer,一个简单的Kafka生产者,发送消息出去

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import java.util.Properties;
  4. import java.util.Scanner;
  5. public class TestProducer {
  6. public static void main(String[] args) {
  7. Properties props = new Properties();
  8. props.setProperty("bootstrap.servers", "172.27.210.57:9092");
  9. props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10. props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  12. String topic = "lt";
  13. while(true){
  14. System.out.println("请输入一个字符串");
  15. Scanner sc = new Scanner(System.in);
  16. String str = sc.nextLine();
  17. producer.send(new ProducerRecord<>(topic, str));
  18. }
  19. }
  20. }

4.运行结果

Kafka发送两条消息
image.png
flink对每条消息的处理结果
image.png
如果不懂Kafka,可以换成netcat发送数据,
18行取消注释
DataStream text = env.socketTextStream(“127.0.0.1”, 9000);
36行lines改成text
DataStream> dataStream = lines.flatMap()
注释调20-33行Kafka那一段
运行netcat,结果如图
image.png
image.png

5.关于重写方法

个人觉得上面那种写法有点不易读,主要是实现了flatMap()里面的接口FlatMapFunction,补全的它的方法flatMap(),可以写一个类来实现它,改动如下:

  1. DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new Splitter()).keyBy(0).sum(1);
  1. public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  2. @Override
  3. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  4. String[] tokens = s.toLowerCase().split("\\W+");
  5. for (String token : tokens) {
  6. if (token.length() > 0) {
  7. collector.collect(new Tuple2<String, Integer>(token, 1));
  8. }
  9. }
  10. }
  11. }

flatMap对应的FlatMapFunction为例,它在源码中的定义为:

  1. public interface FlatMapFunction<T, O> extends Function, Serializable {
  2. void flatMap(T value, Collector<O> out) throws Exception;
  3. }

这是一个接口类,它继承了Flink的Function函数式接口。函数式接口只有一个抽象函数方法(Single Abstract Method),其目的是为了方便Java 8 Lambda表达式的使用。此外,它还继承了Serializable,以便进行序列化,这是因为这些函数在运行过程中要发送到各个TaskManager上,发送前后要进行序列化和反序列化。需要注意的是,使用这些函数时,一定要保证函数内的所有内容都可以被序列化。如果有一些不能被序列化的内容,或者使用接下来介绍的Rich函数类,或者重写Java的序列化和反序列化方法。
进一步观察FlatMapFunction发现,这个这个函数有两个泛型T和O,T是输入,O是输出,在使用时,要设置好对应的输入和输出数据类型。自定义函数最终归结为重写函数flatMap,函数的两个参数也与输入输出的泛型类型对应,即参数value的是flatMap的输入,数据类型是T,参数out是flatMap的输出,我们需要将类型为O的数据写入out。
在我们的程序里,泛型O用了Tuple2,Tuple2又是一个泛型类,是flink里定义的,在里面设置参数类型为,就可以做到单词计数了。