基础

引入依赖包:

  1. <!-- version 和 Flink的版本一致即可 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka_2.12</artifactId>
  5. <version>1.11.1</version>
  6. </dependency>

配置Kafka的属性:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.127.128:9092"); //我的Kafka地址
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");

测试运行:

DataStreamSource<String> sensor = streamExecutionEnvironment.addSource(
                new FlinkKafkaConsumer<String>("sensor2",  // 订阅的Topic名称
                                                new SimpleStringSchema(), // Kafka解析方式
                                                properties) // Kafka属性
);
sensor.print().setParallelism(1);
streamExecutionEnvironment.execute();

输出结果:
image.png
在程序连接Kafka的过程中,可能会有远程连接问题,大家伙可以看这篇文章——《Kafka的远程连接问题》