本文介绍kafka-SDK的消费者消费消息示例的使用
准备配置
安装Java依赖库
在pom.xml中添加以下依赖。
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency>
使用示例
推荐使用安服优的AfuKafkaConfig默认配置,需要先引入相关sdk详情可参照:SDK下载与使用,可以直接使用以下示例进行单Consumer订阅消息。
以下是创建单Consumer订阅消息程序AfuKafkaConsumerDemo.java。
import com.afu.common.sdk.config.AfuKafkaConfig;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import java.time.Duration;import java.util.*;public class AfuKafkaConsumerDemo {public static void main(String args[]) {AfuKafkaConfig kafkaConfig=new AfuKafkaConfig();Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBOOTSTRAP_SERVERS_CONFIG());//两次Poll之间的最大允许间隔。//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance。//(AfuKafkaConfig配置为20(s))props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfig.getSESSION_TIMEOUT_MS_CONFIG());//每次Poll的最大数量。//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。//(AfuKafkaConfig配置为20)props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getMAX_POLL_RECORDS_CONFIG());//消息的反序列化方式。props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getKEY_DESERIALIZER_CLASS_CONFIG());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getVALUE_DESERIALIZER_CLASS_CONFIG());//当前消费实例所属的消费组,请在控制台申请之后填写。//属于同一个组的消费实例,会负载消费消息。//(AfuKafkaConfig配置为"default_consumer_group")props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGROUP_ID_CONFIG());//构造消费对象,也即生成一个消费实例。KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);//设置消费组订阅的消费组id(groupId),可以订阅多个。//如果GROUP_ID_CONFIG是一样,则订阅的groupId也建议设置成一样。List<String> subscribedGroupIds = new ArrayList<>();//如果需要订阅多个GroupId,则在这里添加进去即可。//每个GroupId需要先在安服优物联云平台进行创建,您可以查看后再填入。subscribedGroupIds.add("TestGroupId");consumer.subscribe(subscribedGroupIds);//循环消费消息。while (true){try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。//建议开一个单独的线程池来消费消息,然后异步返回结果。for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset())+"value:"+record.value());}} catch (Exception e) {try {Thread.sleep(1000);} catch (Throwable ignore) {}e.printStackTrace();}}}}
当生产者发布消息后,该demo将进行单Consumer的消费
