Docker搭建

  • 分别搭建zookeeper和kafka容器即可
  • 脚本如下
  • docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
  • docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.155.56:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.155.56:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
  • 参数说明
    • KAFKA_ZOOKEEPER_CONNECT:zk地址
    • ADVERTISED_LISTENERS:将自己注册到zk上的地址,需要自己的外网IP
    • LISTENERS:kafka绑定的监听端口,如果是localhost/127.0.0.1那就只能本地访问;外网IP也绑定不了;需要是0.0.0.0
  • 刚开始搭建时,kafka运行后直接退出,于是加了-t /bin/bash参数,但是进去后发现,进程中并没有kafka进程,并且查看docker日志发现也没有输出
  • 原因时kafka启动后由于错误直接退出了;这时候就不要让容器继续运行下去了,当容器失败时直接去查看日志,会发现是有日志输出的;由于内存不足,所以增加kafka jvm内存大小

生产者

发送消息

  • 创建生产者时,需要设置基本必选的属性
    • bootstrap.servers:包含了broker的地址清单,格式为host:port,清单里不需要包含所有主机(可以从一个broker中找到其他broker),但是推荐至少两个,防止宕机;
    • key.serializer:设置数据键的序列化器,该值是一个实现了Serializer接口的类;Kafka客户端默认提供了ByteArraySerializer,StringSerializer,IntegerSerializer;一般无需自定义
    • value.serializer:数据值的序列化器
  • 消息发送方式
    • 发送并忘记:这种其实就是直接发送消息,不管结果如何;但是对于一些发送失败的情况,kafka会自动重试
    • 同步发送:消息发送后,会返回一个Future对象,直接阻塞等待即可
    • 异步发送:发送时,传入回调函数
  • 异常返回
    • 如果发送正常,会返回一个RecordMetatdata对象,可以用于获取消息偏移量
    • 如果发生错误,那么会抛出异常
    • Kafka生产者一般会发生两种错误,一种是可重试错误(会一直重试发送,直到超过次数抛出异常);另一种是不可重试错误,例如消息过大,直接抛异常

生产者配置

  • acks:制定了必须有多少个分区副本收到消息,生产者才认为消息写入成功了,该参数对消息丢失可能有重大影响
    • acks=0:生产者一旦将消息成功发出,就认为消息成功写入了,不会等待服务器响应;吞吐量最高
    • acks=1:kafka默认方式,只要集群的首领节点收到消息,生产者就会收到来自服务器的响应,认为写入成功;如果没有收到,生产者会重发消息;极端情况下,如果分区首领收到后写入并回复客户端成功,还没同步给其他节点就宕机了(严重的磁盘损坏),那么数据会丢失;吞吐量高低看客户端是同步还是异步发送;使用异步发送可以使等待回复的延迟降低
    • acks=all:只有所有参与复制的节点全部收到消息,生产者才会收到服务器的响应。这时最安全的,可以保证不止一个服务器收到消息,某些宕机了也可以运行;不过吞吐量低
  • buffer.memory:设置生产者内存缓冲区大小,生产者用来缓冲要发送的消息;如果程序发送消息速度超过发送到服务器的速度,就会导致生产者空间不足;这时调用send要么阻塞,要么抛异常;取决于block.on.buffer.full参数(0.9.0.0版本被替换为max.block.ms
  • compression.type:默认情况下,消息发送不会被压缩;该参数可以设置为snappy,gzip,lz4,制定了消息被发送给broker之前使用哪种压缩算法压缩;使用压缩可以降低网络传输开销和存储开销
  • retries:指定了生产者重发消息次数;如果到达了次数,生产者会放弃重试并返回错误;默认重试间隔100ms,通过retry.backoff.ms指定;可以先测试一下节点崩溃的恢复时间和分区首领选举时间,让总的重试时间比Kafka集群从崩溃中恢复的时间长;避免过早放弃重试;一般情况下,生产者会自动重试,所以没必要在代码逻辑中处理可重试错误,而应该处理不可重试错误或超过重试次数的错误
  • batch.size:当多个消息被发送到同一个分区时,生产者会把他们放在同一个批次里,该参数指定了一个批次可以使用的内存大小;按字节数为单位;当批次被填满时,批次里的所有消息被发送;不过生产者不一定等到批次满了才会发送;所以批次设置大不会导致延迟,而设置小可能会导致频繁发送
  • linger.ms:指定了生产者发送批次之前等待更多消息加入的时间,Kafka会在批次填满或等待到达该时间时把消息发送出去;默认情况下,只要有可用线程,即使批次里只有一个消息也会发送;把linger.ms设置大于0,会让生产者发送批次前等 一会,让更多消息加入;虽然增加了延迟,但是提高吞吐量
  • client.id:服务器用于识别消息来源
  • max.in.flight.request.per.connection:指定了生产者在接收到服务器响应之前可以发送多少个消息;值越高,会占用更多内存,但是可以提高吞吐量;设置为1时,可以保证消息发送是按照写入顺序发送的,即使发生重试
  • timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms
    • request.ms:制定了生产者发送数据时等待服务器返回响应的时间
    • metadata.fetch.timeout.ms:指定了生产者获取元数据(比如目标分区首领是谁)时等待服务器返回响应时间
    • 如果等待超时,生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)
    • timeout.ms:指定了broker等待同步副本返回消息确认的时间与acks的配置相匹配——如果在指定时间内没有收到同步副本确认,那么broker就会返回错误
  • max.block.ms