基础
引入依赖包:
<!-- version 和 Flink的版本一致即可 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.1</version>
</dependency>
配置Kafka的属性:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.127.128:9092"); //我的Kafka地址
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
测试运行:
DataStreamSource<String> sensor = streamExecutionEnvironment.addSource(
new FlinkKafkaConsumer<String>("sensor2", // 订阅的Topic名称
new SimpleStringSchema(), // Kafka解析方式
properties) // Kafka属性
);
sensor.print().setParallelism(1);
streamExecutionEnvironment.execute();
输出结果:
在程序连接Kafka的过程中,可能会有远程连接问题,大家伙可以看这篇文章——《Kafka的远程连接问题》