本文介绍kafka-SDK的消费者消费消息示例的使用
准备配置
安装Java依赖库
在pom.xml中添加以下依赖。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
使用示例
推荐使用安服优的AfuKafkaConfig默认配置,需要先引入相关sdk详情可参照:SDK下载与使用,可以直接使用以下示例进行单Consumer订阅消息。
以下是创建单Consumer订阅消息程序AfuKafkaConsumerDemo.java。
import com.afu.common.sdk.config.AfuKafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.time.Duration;
import java.util.*;
public class AfuKafkaConsumerDemo {
public static void main(String args[]) {
AfuKafkaConfig kafkaConfig=new AfuKafkaConfig();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBOOTSTRAP_SERVERS_CONFIG());
//两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance。
//(AfuKafkaConfig配置为20(s))
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfig.getSESSION_TIMEOUT_MS_CONFIG());
//每次Poll的最大数量。
//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
//(AfuKafkaConfig配置为20)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getMAX_POLL_RECORDS_CONFIG());
//消息的反序列化方式。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getKEY_DESERIALIZER_CLASS_CONFIG());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getVALUE_DESERIALIZER_CLASS_CONFIG());
//当前消费实例所属的消费组,请在控制台申请之后填写。
//属于同一个组的消费实例,会负载消费消息。
//(AfuKafkaConfig配置为"default_consumer_group")
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGROUP_ID_CONFIG());
//构造消费对象,也即生成一个消费实例。
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
//设置消费组订阅的消费组id(groupId),可以订阅多个。
//如果GROUP_ID_CONFIG是一样,则订阅的groupId也建议设置成一样。
List<String> subscribedGroupIds = new ArrayList<>();
//如果需要订阅多个GroupId,则在这里添加进去即可。
//每个GroupId需要先在安服优物联云平台进行创建,您可以查看后再填入。
subscribedGroupIds.add("TestGroupId");
consumer.subscribe(subscribedGroupIds);
//循环消费消息。
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//必须在下次Poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
//建议开一个单独的线程池来消费消息,然后异步返回结果。
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset())+"value:"+record.value());
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
}
当生产者发布消息后,该demo将进行单Consumer的消费