概念入门

特性

  • 分布式、基于Pub/Sub的消息系统
  • 以时间复杂度为O(1)的方式提供消息持久化的能力,保证海量数据的高性能
  • 高吞吐率,即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上的消息的传输
  • 支持Server间的消息分区,分布式消费,也保证每个partition内的顺序传输,如果业务要求全局有序只能一个topic一个partition
  • 同时支持离线数据处理和实时数据处理
  • 支持在线水平扩展

    概念

  • Broker:Kafka集群包含的一个或多个服务器

  • Topic:每条发布到集群的消息都有一个类别即Topic
  • Partition:是物理上的概念,每个Topic包含一个或多个Partition
  • Producer:发布消息到Broker
  • Consumer:消息消费者,向Broker读取消息
  • ConsumerGroup:每个Consumer属于一个特定的组,如不指定组名则属于默认组

    单机部署结构

    单机消息处理.png

    集群部署结构

    集群消息处理.png

    多Partition

    多Partition.png

    Partition和Replica

    Partition和Replica.png

    Kafka-manager

  • kafka的web控制台

    简单使用

    单机

    ```bash wget ‘https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz‘ tar xf kafka_2.13-2.7.0.tgz cd kafka_2.13-2.7.0 vim config/server.properties

    listeners=PLAINTEXT://localhost:9092

./bin/zookeeper-server-start.sh config/zookeeper.properties ./bin/kafka-server-start.sh config/server.properties

命令行操作

bin/kafka-topics.sh —zookeeper localhost:2181 —list #查看topic bin/kafka-topics.sh —zookeeper localhost:2181 —create —topic testk —partitions 4 —replication-factor 1 bin/kafka-topics.sh —zookeeper localhost:2181 —describe —topic testk

bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —from-beginning —topic testk bin/kafka-console-producer.sh —bootstrap-server localhost:9092 —topic testk

简单性能测试

bin/kafka-producer-perf-test.sh —topic testk —num-records 1000000 —record-size 1000 —throughput 1000000 —producer-props bootstrap.servers=localhost:9092 bin/kafka-consumer-perf-test.sh —bootstrap-server localhost:9092 —topic testk —fetch-size 1048576 —messages 100000 —threads 1

  1. <a name="d3gAx"></a>
  2. ## Java
  3. ![java生产者.png](https://cdn.nlark.com/yuque/0/2021/png/1491874/1614833292565-440a95af-493f-4c4a-8414-247c966d6805.png#align=left&display=inline&height=710&margin=%5Bobject%20Object%5D&name=java%E7%94%9F%E4%BA%A7%E8%80%85.png&originHeight=710&originWidth=1016&size=417537&status=done&style=none&width=1016)<br />![java消费者.png](https://cdn.nlark.com/yuque/0/2021/png/1491874/1614833297916-dd1946e6-668b-4834-9d51-7dff6d28112d.png#align=left&display=inline&height=720&margin=%5Bobject%20Object%5D&name=java%E6%B6%88%E8%B4%B9%E8%80%85.png&originHeight=720&originWidth=1032&size=406976&status=done&style=none&width=1032)
  4. <a name="geJJP"></a>
  5. # 集群配置
  6. <a name="xxhpg"></a>
  7. ## 配置文件
  8. ```bash
  9. broker.id=1,2,3
  10. # 单个机器上部署三个节点需要配置不同的目录
  11. log.dirs=log1,log2,log3

高级特性

生产者

执行步骤

  • 客户端实现序列化,分区,压缩操作

生产者执行步骤.png

确认模式

  • acks=0 只发送不管有没有写入到broker
  • acks=1 写入到leader就认为成功
  • acks=-1/all 写入到最小的副本数则认为成功

确认模式.png

同步发送

同步发送.png

异步发送

异步发送.png

顺序保证

  • 必须是在同步发送的模式下才能保证
  • max.in.flight.requests.per.connnection = 1

顺序保证.png

消息可靠性传输

  • acks=-1/all; 保证多个副本
  • enable.idempotence=true; 打开幂等操作,此时也默认会把acsk设置为-1/all
  • transaction.id 增加事务ID

可靠性传输.png

消费者

触发groupRebalance

  • 一个consumer挂掉
  • 新加入一个consumer
  • topic里加入一个新的partition
  • 一个新的topic匹配已有的订阅规则(subscribed regx)

    consumerGroup与partition关系

    消费者与partition关系.png

    offset同步提交

  • offset记录consumer消费的位置

  • commit后offset才会被记录

offset同步提交.png

offset异步提交

异步提交.png

offset自动提交

自动提交.png

offsetSeek

offsetSeek.png
offsetSeekConsumer.png