一、Java操作案例
实现步骤
- 创建项目
- 加入依赖
- producer
- consumer
- 测试
Kafka中的kv说明
- kafka中的key不是必须设置的,当你不设置时,则key视为空
- 若设置,就是复制partition分区和小消费者分配问题的
ProducerRecord参数:
- 参数说明
- topic—记录将被附加到的主题
- partition—将记录发送到的分区
- timestamp—记录的时间戳,从epoch开始的毫秒数。如果为空,生产者将使用System.currentTimeMillis()分配时间戳。
- key—将包含在记录中的密钥
- value—记录内容
- headers-将包含在记录中的标头
- 分区规则
- <1> 若指定Partition ID,则PR被发送至指定Partition
- <2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition
- <3> 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition
- <4> 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)
操作详情
public class KafkaProducerUtil {
public static void main(String[] args) {
//初始化broker列表和topic
String brokerlist = "cluster1.hadoop:6667";
String topic = "testmhk";
//初始化工具类
KafkaProducerUtil kafkaProducerUtil = new KafkaProducerUtil(brokerlist);
//发送hello,kafka
kafkaProducerUtil.producer.send(new ProducerRecord<String,String>(topic,"hello,kafka"));
kafkaProducerUtil.close();
System.out.println("发送数据完成");
}
public KafkaProducer<String, String> producer;
public KafkaProducerUtil(String brokerList){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
// key序列化指定类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// value序列化指定类
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producer = new KafkaProducer<String, String>(props);
}
public void close (){
this.producer.close();
}
}
public class KafkaConsumerUtil {
public static void main(String[] args) {
//初始化broker列表和topic
String brokerlist = "cluster1.hadoop:6667";
String topic = "testmhk";
//初始化工具类
KafkaConsumerUtil kafkaConsumerUtil = new KafkaConsumerUtil(brokerlist,topic);
//消费数据
boolean temp = true;
while (temp){
ConsumerRecords<String, String> record =kafkaConsumerUtil
.consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> record1:record){
System.out.println(record1.key());
System.out.println(record1.value());
System.out.println(record1.offset());
}
}
kafkaConsumerUtil.close();
System.out.println("done");
}
public KafkaConsumer<String, String> consumer;
public KafkaConsumerUtil(String brokerList,String topic){
Properties props = new Properties();
// 服务器ip:端口号,集群用逗号分隔
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
// // 消费者指定组,名称可以随意,注意相同消费组中的消费者只能对同一个分区消费一次
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "testmhk");
// 是否启用自动提交offset,默认true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交间隔时间1s
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// key反序列化指定类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// value反序列化指定类,注意生产者与消费者要保持一致,否则解析出问题
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void close (){
consumer.close();
}
}