目的:读取kafka主题中的数据
环境:jdk8 工具:idea2019.3
创建maven项目
引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
DataStream用于实时数据统计
//创建流处理执行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//Kafka连接配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.18.216:9092");
props.put("group.id","consumer-group" );
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
//接受数据
DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("test",new SimpleStringSchema(),props));
dataStream.print();
//执行
env.execute();
Kafka读取数据.rar
结果
>前代表线程
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
3> haha
3> hello word !
Docker快速搭建kafka环境
本地虚拟机: Linux CentOS7
下载Kafka和Zookeeper镜像文件
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper
先运行zk,再运行kafka
1,启动zk
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
2,启动kafka
docker run --name kafka01 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.18.216:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.18.216:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-d wurstmeister/kafka
创建主题
进入到容器中:
docker exec -it kafka01 /bin/bash
创建主题: test
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.18.216:2181 --replication-factor 1 --partitions 1 --topic test
向topic发送数据
进入topic
kafka-console-producer.sh --broker-list 192.168.18.216:9092 --topic test