一、Java操作案例

实现步骤

  1. - 创建项目
  2. - 加入依赖
  3. - producer
  4. - consumer
  5. - 测试

Kafka中的kv说明

  1. - kafka中的key不是必须设置的,当你不设置时,则key视为空
  2. - 若设置,就是复制partition分区和小消费者分配问题的

ProducerRecord参数:

  • 参数说明
    • topic—记录将被附加到的主题
    • partition—将记录发送到的分区
    • timestamp—记录的时间戳,从epoch开始的毫秒数。如果为空,生产者将使用System.currentTimeMillis()分配时间戳。
    • key—将包含在记录中的密钥
    • value—记录内容
    • headers-将包含在记录中的标头
  • 分区规则
    • <1> 若指定Partition ID,则PR被发送至指定Partition
    • <2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition
    • <3> 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition
    • <4> 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)

操作详情

  1. public class KafkaProducerUtil {
  2. public static void main(String[] args) {
  3. //初始化broker列表和topic
  4. String brokerlist = "cluster1.hadoop:6667";
  5. String topic = "testmhk";
  6. //初始化工具类
  7. KafkaProducerUtil kafkaProducerUtil = new KafkaProducerUtil(brokerlist);
  8. //发送hello,kafka
  9. kafkaProducerUtil.producer.send(new ProducerRecord<String,String>(topic,"hello,kafka"));
  10. kafkaProducerUtil.close();
  11. System.out.println("发送数据完成");
  12. }
  13. public KafkaProducer<String, String> producer;
  14. public KafkaProducerUtil(String brokerList){
  15. Properties props = new Properties();
  16. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  17. // key序列化指定类
  18. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  19. StringSerializer.class.getName());
  20. // value序列化指定类
  21. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  22. StringSerializer.class.getName());
  23. producer = new KafkaProducer<String, String>(props);
  24. }
  25. public void close (){
  26. this.producer.close();
  27. }
  28. }
  1. public class KafkaConsumerUtil {
  2. public static void main(String[] args) {
  3. //初始化broker列表和topic
  4. String brokerlist = "cluster1.hadoop:6667";
  5. String topic = "testmhk";
  6. //初始化工具类
  7. KafkaConsumerUtil kafkaConsumerUtil = new KafkaConsumerUtil(brokerlist,topic);
  8. //消费数据
  9. boolean temp = true;
  10. while (temp){
  11. ConsumerRecords<String, String> record =kafkaConsumerUtil
  12. .consumer.poll(Duration.ofSeconds(1));
  13. for (ConsumerRecord<String,String> record1:record){
  14. System.out.println(record1.key());
  15. System.out.println(record1.value());
  16. System.out.println(record1.offset());
  17. }
  18. }
  19. kafkaConsumerUtil.close();
  20. System.out.println("done");
  21. }
  22. public KafkaConsumer<String, String> consumer;
  23. public KafkaConsumerUtil(String brokerList,String topic){
  24. Properties props = new Properties();
  25. // 服务器ip:端口号,集群用逗号分隔
  26. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  27. // // 消费者指定组,名称可以随意,注意相同消费组中的消费者只能对同一个分区消费一次
  28. // props.put(ConsumerConfig.GROUP_ID_CONFIG, "testmhk");
  29. // 是否启用自动提交offset,默认true
  30. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  31. // 自动提交间隔时间1s
  32. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
  33. // key反序列化指定类
  34. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  35. StringDeserializer.class.getName());
  36. // value反序列化指定类,注意生产者与消费者要保持一致,否则解析出问题
  37. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  38. StringDeserializer.class.getName());
  39. consumer = new KafkaConsumer<String, String>(props);
  40. consumer.subscribe(Arrays.asList(topic));
  41. }
  42. public void close (){
  43. consumer.close();
  44. }
  45. }