安装

单机版

  1. https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz

启动自带的zookeeper

  1. bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

  1. bin/kafka-server-start.sh config/server.properties &

测试

  1. # 创建主题
  2. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  3. # 查看主题
  4. bin/kafka-topics.sh --list --zookeeper localhost:2181
  5. bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic userLog
  6. # 启动生产者,启动后,在命令行下每输入一些字符串按下回车时,就作为一个消息并发送的kafka
  7. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  8. # 启动消费者,另启动一个窗口
  9. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

image.png

  • 查看指定主题

image.png

集群

修改配置文件即可
image.png

基础

名词

Broker:消息中间件处理节点;每个Kafka服务节点称之为一个Broker,一个Kafka集群由一个或多个Broker组成

Topic:一类特定数据集合的统称;可类比DB中Table的概念;逻辑概念

Producer:消息的生产者,向Broker发送消息的客户端

Consumer:消息的消费者,向Broker读取消息的客户端

Consumer Group:每一个Consumer隶属于一个特定的Consumer Group,一条消息可以被不同Group中的Consumer消费,但同一Group内的消息只能被一个Consumer消费

Partition:是对Topic中所包含数据集的物理分区;物理概念

Replication:副本集;是Kafka高可用的一种保障机制

image.png

image.png

image.png

image.png
image.png

image.png

image.png

image.png

高性能
image.png
image.pngimage.png

JavaAPI

不同版本API可能会有区别

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.11</artifactId>
  4. <version>1.0.0</version>
  5. </dependency>
  1. package com.alvin.controller;
  2. import kafka.Kafka;
  3. import kafka.producer.KeyedMessage;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. import java.util.Properties;
  7. import java.util.UUID;
  8. /**
  9. * @Description
  10. * @Author 田云
  11. * @Date 2021/3/21 11:07
  12. * @Version 1.0
  13. */
  14. public class KafkaProducerSimple {
  15. public static void main(String[] args) {
  16. String TOPIC = "orderMq";
  17. //读取配置文件
  18. Properties props = new Properties();
  19. //1.指定Kafaka集群的ip地址和端口号
  20. props.put("bootstrap.servers","localhost:9092");
  21. //2.等待所有副本节点的应答
  22. props.put("acks","all");
  23. //3.消息发送最大尝试次数
  24. props.put("retries",0);
  25. //4.指定一批消息处理次数
  26. props.put("batch.size",16384);
  27. //5.指定请求延时
  28. props.put("linger.ms",1);
  29. //6.指定缓存区内存大小
  30. props.put("buffer.memory",33554432);
  31. //7.设置key序列化
  32. props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  33. //8.设置value序列化
  34. props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
  35. //可选配置用来把消息分到各个partition中,默认是kafka.producer.DefaultPartioner,即对key进行hash
  36. //props.put("partitioner.class", "cn.mylove.storm.kafka.MyLogPartitioner");
  37. //通过配置文件创建生产者
  38. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  39. for (int i = 0; i < 50000; i++) {
  40. producer.send(new ProducerRecord<String, String>(TOPIC, Integer.toString(i), "ycf vs yy" + i));
  41. }
  42. producer.close();
  43. }
  44. }
  1. package com.alvin.controller;
  2. import kafka.consumer.Consumer;
  3. import kafka.consumer.ConsumerIterator;
  4. import kafka.consumer.KafkaStream;
  5. import kafka.javaapi.consumer.ConsumerConnector;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.clients.consumer.ConsumerRecords;
  8. import org.apache.kafka.clients.consumer.KafkaConsumer;
  9. import java.time.Duration;
  10. import java.util.*;
  11. /**
  12. * @Description
  13. * @Author 田云
  14. * @Date 2021/3/21 11:45
  15. * @Version 1.0
  16. */
  17. public class KafkaConsumerSimple {
  18. public static void main(String[] args) {
  19. String topic = "orderMq";
  20. Properties props = new Properties();
  21. props.put("bootstrap.servers", "localhost:9092");
  22. props.put("group.id", "GROUP");
  23. props.put("auto.commit.interval.ms", "1000");
  24. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  25. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  26. try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
  27. consumer.subscribe(Arrays.asList(topic));
  28. for (int i = 0; i < 1000; i++) {
  29. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
  30. System.out.println("Size: " + records.count());
  31. for (ConsumerRecord<String, String> record : records) {
  32. System.out.println("Received a message: " + record.key() + " " + record.value());
  33. }
  34. }
  35. }
  36. System.out.println("End");
  37. }
  38. }