1 Apache Kafka 基本操作
1.1 准备
本机ip:192.168.135.145
关闭防火墙:systemctl stop firewalld
1.2 启动Zookeeper
zookeeper-server-start.sh config/zookeeper.properties
1.3 启动Kafka Broker
kafka-server-start.sh config/server.properties
1.4 创建主题Topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic test
输出
Created topic test.
1.5 查看主题Topic
kafka-topics.sh --list --zookeeper localhost:2181
输出
test
1.6 修改主题Topic
1.6.1 修改分区数Partition
kafka-topics.sh --zookeeper localhost:2181 --alter --topic topicName --partitions 8
1.6.2 增加配置
kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --config flush.messages=1
1.6.3 删除配置
kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --delete-config flush.messages
1.6.4 删除主题Topic
kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName
1.7 生产者
kafka-console-producer.sh --broker-list 192.168.135.145:9092 --topic test
1.8 消费者
kafka-console-consumer.sh --bootstrap-server 192.168.135.145:9092 --topic test --from-beginning
例子:
2 集群模式
克隆两台虚拟机
复制两个配置文件
cd /opt/kafka_2.11-2.4.0/
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
vim config/server-1.properties
broker.id=0
listeners=PLAINTEXT://192.168.135.145:9092
log.dirs=/tmp/kafka-logs-0
zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
host.name=192.168.135.145
vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://192.168.135.146:9092
log.dirs=/tmp/kafka-logs-0
zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
host.name=192.168.135.146
vim config/server-2.properties
broker.id=1
listeners=PLAINTEXT://192.168.135.147:9092
log.dirs=/tmp/kafka-logs-0
zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
host.name=192.168.135.147
克隆三个窗口
Broker1
kafka-server-start.sh config/server.properties
Broker2
kafka-server-start.sh config/server-1.properties
Broker3
kafka-server-start.sh config/server-2.properties
2.1 创建主题
创建三个分区,两个副本
kafka-topics.sh --create --zookeeper 192.168.135.145:2181 --replication-factor 2 -- partitions 3 --topic test
2.2 查看主题
kafka-topics.sh --list --zookeeper 192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
2.3 生产者
kafka-console-producer.sh --broker-list 192.168.135.145:9092,192.168.135.146:9092,192.168.135.147:9092 --topic test
2.4 消费者
kafka-console-consumer.sh --from-beginning --topic test --zookeeper 192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
2.5 查看各节点运行情况
kafka-topics.sh --describe --zookeeper 192.168.135.145:2181 --topic test
输出
Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs:<br /> Topic: test Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
2.6 查看主题消费进度
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
topic:创建时topic名称
pid:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
3 Java API
3.1 pom
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
3.2 生产者
public class SimpleProducer {
public static void main(String[] args) {
//Assign topicName to string variable
String topicName = "my-topic";
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", "192.168.135.145:9092");
//Set acknowledgements for producer requests.
props.put("acks", "all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<> (props);
for (int i = 0; i < 10; i++) {
// 发送消息
producer.send(new ProducerRecord<>(topicName,"hello world->"+ Integer.toString(i)));
}
System.out.println("Message sent successfully");
producer.close();
}
}
3.3 消费者
public class SimpleConsumer {
public static void main(String[] args) {
String topic = "my-topic";
String group = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.135.145:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 接收消息
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
注意:测试之前一定要关闭防火墙,否则建立不了连接。
输出
offset = 16, key = null, value = hello world->0
offset = 17, key = null, value = hello world->1
offset = 18, key = null, value = hello world->2
offset = 19, key = null, value = hello world->3
offset = 20, key = null, value = hello world->4
offset = 21, key = null, value = hello world->5
offset = 22, key = null, value = hello world->6
offset = 23, key = null, value = hello world->7
offset = 24, key = null, value = hello world->8
offset = 25, key = null, value = hello world->9