目的:读取kafka主题中的数据
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  14. <version>1.10.1</version>
  15. </dependency>

代码演示

DataStream用于实时数据统计

  1. //创建流处理执行环境
  2. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
  3. //Kafka连接配置
  4. Properties props = new Properties();
  5. props.put("bootstrap.servers", "192.168.18.216:9092");
  6. props.put("group.id","consumer-group" );
  7. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  9. props.put("auto.offset.reset", "latest");
  10. //接受数据
  11. DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("test",new SimpleStringSchema(),props));
  12. dataStream.print();
  13. //执行
  14. env.execute();

Kafka读取数据.rar

结果

>前代表线程

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. 3> haha
  5. 3> hello word !


Docker快速搭建kafka环境

本地虚拟机: Linux CentOS7

下载Kafka和Zookeeper镜像文件

  1. docker pull wurstmeister/kafka
  2. docker pull wurstmeister/zookeeper

先运行zk,再运行kafka

  1. 1,启动zk
  2. docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
  3. 2,启动kafka
  4. docker run --name kafka01 \
  5. -p 9092:9092 \
  6. -e KAFKA_BROKER_ID=0 \
  7. -e KAFKA_ZOOKEEPER_CONNECT=192.168.18.216:2181 \
  8. -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.18.216:9092 \
  9. -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
  10. -d wurstmeister/kafka

创建主题

  1. 进入到容器中:
  2. docker exec -it kafka01 /bin/bash
  3. 创建主题: test
  4. /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.18.216:2181 --replication-factor 1 --partitions 1 --topic test

向topic发送数据

  1. 进入topic
  2. kafka-console-producer.sh --broker-list 192.168.18.216:9092 --topic test

发送示例

image.png