基础
引入依赖包:
<!-- version 和 Flink的版本一致即可 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.11.1</version></dependency>
配置Kafka的属性:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.127.128:9092"); //我的Kafka地址
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
测试运行:
DataStreamSource<String> sensor = streamExecutionEnvironment.addSource(
new FlinkKafkaConsumer<String>("sensor2", // 订阅的Topic名称
new SimpleStringSchema(), // Kafka解析方式
properties) // Kafka属性
);
sensor.print().setParallelism(1);
streamExecutionEnvironment.execute();
输出结果:
在程序连接Kafka的过程中,可能会有远程连接问题,大家伙可以看这篇文章——《Kafka的远程连接问题》
