1 Apache Kafka 基本操作
1.1 准备
本机ip:192.168.135.145
关闭防火墙:systemctl stop firewalld
1.2 启动Zookeeper
zookeeper-server-start.sh config/zookeeper.properties
1.3 启动Kafka Broker
kafka-server-start.sh config/server.properties
1.4 创建主题Topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic test
输出
Created topic test.
1.5 查看主题Topic
kafka-topics.sh --list --zookeeper localhost:2181
输出
test
1.6 修改主题Topic
1.6.1 修改分区数Partition
kafka-topics.sh --zookeeper localhost:2181 --alter --topic topicName --partitions 8
1.6.2 增加配置
kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --config flush.messages=1
1.6.3 删除配置
kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --delete-config flush.messages
1.6.4 删除主题Topic
kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName
1.7 生产者
kafka-console-producer.sh --broker-list 192.168.135.145:9092 --topic test
1.8 消费者
kafka-console-consumer.sh --bootstrap-server 192.168.135.145:9092 --topic test --from-beginning
例子:
2 集群模式
克隆两台虚拟机
复制两个配置文件
cd /opt/kafka_2.11-2.4.0/cp config/server.properties config/server-1.propertiescp config/server.properties config/server-2.propertiesvim config/server-1.propertiesbroker.id=0listeners=PLAINTEXT://192.168.135.145:9092log.dirs=/tmp/kafka-logs-0zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181host.name=192.168.135.145vim config/server-1.propertiesbroker.id=1listeners=PLAINTEXT://192.168.135.146:9092log.dirs=/tmp/kafka-logs-0zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181host.name=192.168.135.146vim config/server-2.propertiesbroker.id=1listeners=PLAINTEXT://192.168.135.147:9092log.dirs=/tmp/kafka-logs-0zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181host.name=192.168.135.147
克隆三个窗口
Broker1kafka-server-start.sh config/server.propertiesBroker2kafka-server-start.sh config/server-1.propertiesBroker3kafka-server-start.sh config/server-2.properties
2.1 创建主题
创建三个分区,两个副本
kafka-topics.sh --create --zookeeper 192.168.135.145:2181 --replication-factor 2 -- partitions 3 --topic test
2.2 查看主题
kafka-topics.sh --list --zookeeper 192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
2.3 生产者
kafka-console-producer.sh --broker-list 192.168.135.145:9092,192.168.135.146:9092,192.168.135.147:9092 --topic test
2.4 消费者
kafka-console-consumer.sh --from-beginning --topic test --zookeeper 192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
2.5 查看各节点运行情况
kafka-topics.sh --describe --zookeeper 192.168.135.145:2181 --topic test
输出
Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs:<br /> Topic: test Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
2.6 查看主题消费进度
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pvGroup Topic Pid Offset logSize Lag Ownerpv page_visits 0 21 21 0 nonepv page_visits 1 19 19 0 nonepv page_visits 2 20 20 0 none
topic:创建时topic名称
pid:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
3 Java API
3.1 pom
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.4.0</version></dependency></dependencies><build><plugins><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build>
3.2 生产者
public class SimpleProducer {public static void main(String[] args) {//Assign topicName to string variableString topicName = "my-topic";// create instance for properties to access producer configsProperties props = new Properties();//Assign localhost idprops.put("bootstrap.servers", "192.168.135.145:9092");//Set acknowledgements for producer requests.props.put("acks", "all");//If the request fails, the producer can automatically retry,props.put("retries", 0);//Specify buffer size in configprops.put("batch.size", 16384);//Reduce the no of requests less than 0props.put("linger.ms", 1);//The buffer.memory controls the total amount of memory available to the producer for buffering.props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<> (props);for (int i = 0; i < 10; i++) {// 发送消息producer.send(new ProducerRecord<>(topicName,"hello world->"+ Integer.toString(i)));}System.out.println("Message sent successfully");producer.close();}}
3.3 消费者
public class SimpleConsumer {public static void main(String[] args) {String topic = "my-topic";String group = "test";Properties props = new Properties();props.put("bootstrap.servers", "192.168.135.145:9092");props.put("group.id", group);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));System.out.println("Subscribed to topic " + topic);while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// 接收消息System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}}}}
注意:测试之前一定要关闭防火墙,否则建立不了连接。
输出
offset = 16, key = null, value = hello world->0
offset = 17, key = null, value = hello world->1
offset = 18, key = null, value = hello world->2
offset = 19, key = null, value = hello world->3
offset = 20, key = null, value = hello world->4
offset = 21, key = null, value = hello world->5
offset = 22, key = null, value = hello world->6
offset = 23, key = null, value = hello world->7
offset = 24, key = null, value = hello world->8
offset = 25, key = null, value = hello world->9


