安装
单机版
https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
启动自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka
bin/kafka-server-start.sh config/server.properties &
测试
# 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic userLog
# 启动生产者,启动后,在命令行下每输入一些字符串按下回车时,就作为一个消息并发送的kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 启动消费者,另启动一个窗口
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
- 查看指定主题
集群
基础
名词
Broker:消息中间件处理节点;每个Kafka服务节点称之为一个Broker,一个Kafka集群由一个或多个Broker组成
Topic:一类特定数据集合的统称;可类比DB中Table的概念;逻辑概念
Producer:消息的生产者,向Broker发送消息的客户端
Consumer:消息的消费者,向Broker读取消息的客户端
Consumer Group:每一个Consumer隶属于一个特定的Consumer Group,一条消息可以被不同Group中的Consumer消费,但同一Group内的消息只能被一个Consumer消费
Partition:是对Topic中所包含数据集的物理分区;物理概念
Replication:副本集;是Kafka高可用的一种保障机制
JavaAPI
不同版本API可能会有区别
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
package com.alvin.controller;
import kafka.Kafka;
import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.UUID;
/**
* @Description
* @Author 田云
* @Date 2021/3/21 11:07
* @Version 1.0
*/
public class KafkaProducerSimple {
public static void main(String[] args) {
String TOPIC = "orderMq";
//读取配置文件
Properties props = new Properties();
//1.指定Kafaka集群的ip地址和端口号
props.put("bootstrap.servers","localhost:9092");
//2.等待所有副本节点的应答
props.put("acks","all");
//3.消息发送最大尝试次数
props.put("retries",0);
//4.指定一批消息处理次数
props.put("batch.size",16384);
//5.指定请求延时
props.put("linger.ms",1);
//6.指定缓存区内存大小
props.put("buffer.memory",33554432);
//7.设置key序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//8.设置value序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//可选配置用来把消息分到各个partition中,默认是kafka.producer.DefaultPartioner,即对key进行hash
//props.put("partitioner.class", "cn.mylove.storm.kafka.MyLogPartitioner");
//通过配置文件创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 50000; i++) {
producer.send(new ProducerRecord<String, String>(TOPIC, Integer.toString(i), "ycf vs yy" + i));
}
producer.close();
}
}
package com.alvin.controller;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.*;
/**
* @Description
* @Author 田云
* @Date 2021/3/21 11:45
* @Version 1.0
*/
public class KafkaConsumerSimple {
public static void main(String[] args) {
String topic = "orderMq";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "GROUP");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
consumer.subscribe(Arrays.asList(topic));
for (int i = 0; i < 1000; i++) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
System.out.println("Size: " + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received a message: " + record.key() + " " + record.value());
}
}
}
System.out.println("End");
}
}