一、Java操作案例
实现步骤
- 创建项目- 加入依赖- producer- consumer- 测试
Kafka中的kv说明
- kafka中的key不是必须设置的,当你不设置时,则key视为空- 若设置,就是复制partition分区和小消费者分配问题的
ProducerRecord参数:
- 参数说明
- topic—记录将被附加到的主题
- partition—将记录发送到的分区
- timestamp—记录的时间戳,从epoch开始的毫秒数。如果为空,生产者将使用System.currentTimeMillis()分配时间戳。
- key—将包含在记录中的密钥
- value—记录内容
- headers-将包含在记录中的标头
- 分区规则
- <1> 若指定Partition ID,则PR被发送至指定Partition
- <2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition
- <3> 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition
- <4> 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)
操作详情
public class KafkaProducerUtil {public static void main(String[] args) {//初始化broker列表和topicString brokerlist = "cluster1.hadoop:6667";String topic = "testmhk";//初始化工具类KafkaProducerUtil kafkaProducerUtil = new KafkaProducerUtil(brokerlist);//发送hello,kafkakafkaProducerUtil.producer.send(new ProducerRecord<String,String>(topic,"hello,kafka"));kafkaProducerUtil.close();System.out.println("发送数据完成");}public KafkaProducer<String, String> producer;public KafkaProducerUtil(String brokerList){Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);// key序列化指定类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// value序列化指定类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());producer = new KafkaProducer<String, String>(props);}public void close (){this.producer.close();}}
public class KafkaConsumerUtil {public static void main(String[] args) {//初始化broker列表和topicString brokerlist = "cluster1.hadoop:6667";String topic = "testmhk";//初始化工具类KafkaConsumerUtil kafkaConsumerUtil = new KafkaConsumerUtil(brokerlist,topic);//消费数据boolean temp = true;while (temp){ConsumerRecords<String, String> record =kafkaConsumerUtil.consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record1:record){System.out.println(record1.key());System.out.println(record1.value());System.out.println(record1.offset());}}kafkaConsumerUtil.close();System.out.println("done");}public KafkaConsumer<String, String> consumer;public KafkaConsumerUtil(String brokerList,String topic){Properties props = new Properties();// 服务器ip:端口号,集群用逗号分隔props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);// // 消费者指定组,名称可以随意,注意相同消费组中的消费者只能对同一个分区消费一次// props.put(ConsumerConfig.GROUP_ID_CONFIG, "testmhk");// 是否启用自动提交offset,默认trueprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 自动提交间隔时间1sprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// key反序列化指定类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// value反序列化指定类,注意生产者与消费者要保持一致,否则解析出问题props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));}public void close (){consumer.close();}}
