安装
单机版
https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
启动自带的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka
bin/kafka-server-start.sh config/server.properties &
测试
# 创建主题bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test# 查看主题bin/kafka-topics.sh --list --zookeeper localhost:2181bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic userLog# 启动生产者,启动后,在命令行下每输入一些字符串按下回车时,就作为一个消息并发送的kafkabin/kafka-console-producer.sh --broker-list localhost:9092 --topic test# 启动消费者,另启动一个窗口bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

- 查看指定主题

集群
基础
名词
Broker:消息中间件处理节点;每个Kafka服务节点称之为一个Broker,一个Kafka集群由一个或多个Broker组成
Topic:一类特定数据集合的统称;可类比DB中Table的概念;逻辑概念
Producer:消息的生产者,向Broker发送消息的客户端
Consumer:消息的消费者,向Broker读取消息的客户端
Consumer Group:每一个Consumer隶属于一个特定的Consumer Group,一条消息可以被不同Group中的Consumer消费,但同一Group内的消息只能被一个Consumer消费
Partition:是对Topic中所包含数据集的物理分区;物理概念
Replication:副本集;是Kafka高可用的一种保障机制








JavaAPI
不同版本API可能会有区别
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.0</version></dependency>
package com.alvin.controller;import kafka.Kafka;import kafka.producer.KeyedMessage;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;import java.util.UUID;/*** @Description* @Author 田云* @Date 2021/3/21 11:07* @Version 1.0*/public class KafkaProducerSimple {public static void main(String[] args) {String TOPIC = "orderMq";//读取配置文件Properties props = new Properties();//1.指定Kafaka集群的ip地址和端口号props.put("bootstrap.servers","localhost:9092");//2.等待所有副本节点的应答props.put("acks","all");//3.消息发送最大尝试次数props.put("retries",0);//4.指定一批消息处理次数props.put("batch.size",16384);//5.指定请求延时props.put("linger.ms",1);//6.指定缓存区内存大小props.put("buffer.memory",33554432);//7.设置key序列化props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//8.设置value序列化props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//可选配置用来把消息分到各个partition中,默认是kafka.producer.DefaultPartioner,即对key进行hash//props.put("partitioner.class", "cn.mylove.storm.kafka.MyLogPartitioner");//通过配置文件创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (int i = 0; i < 50000; i++) {producer.send(new ProducerRecord<String, String>(TOPIC, Integer.toString(i), "ycf vs yy" + i));}producer.close();}}
package com.alvin.controller;import kafka.consumer.Consumer;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.*;/*** @Description* @Author 田云* @Date 2021/3/21 11:45* @Version 1.0*/public class KafkaConsumerSimple {public static void main(String[] args) {String topic = "orderMq";Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "GROUP");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {consumer.subscribe(Arrays.asList(topic));for (int i = 0; i < 1000; i++) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));System.out.println("Size: " + records.count());for (ConsumerRecord<String, String> record : records) {System.out.println("Received a message: " + record.key() + " " + record.value());}}}System.out.println("End");}}


