我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理。但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费。这时候我们就需要对消息进行分区处理。
使用消息分区
在Spring Cloud Stream中实现消息分区非常简单,我们可以根据消费组示例做一些配置修改就能实现,具体如下:
指定了该应用实例都属于Service-A消费组
spring.cloud.stream.bindings.input.group=Service-A
指定了输入通道对应的主题名
spring.cloud.stream.bindings.input.destination=MyGroup
消息分区(具备相同特征的消息由同一个实例进行消费)
通过该参数开启消费者分区功能
spring.cloud.stream.bindings.input.consumer.partitioned=true
参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount参数 - 1
spring.cloud.stream.instance-index=0
该参数指定了当前消费者的总实例数量
spring.cloud.stream.instance-count=2
1. spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区功能;1. spring.cloud.stream.instanceCount:该参数指定了当前消费者的总实例数量;1. spring.cloud.stream.instanceIndex:该参数设置当前实例的索引号,从0开始,最大值 instanceCount- 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。- 在生产者应用SinkSender中,我们对配置文件也做一些修改,具体如下:```java# 消息分组: 绑定主题spring.cloud.stream.bindings.output.destination=MyGroup# 消息分区# 通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键# 指定分区键的表达式,payload表示获取消息后,进行hash取值计算出分区的值spring.cloud.stream.bindings.output.producer.partition-key-expression=payload# 该参数指定了消息分区的数量spring.cloud.stream.bindings.output.producer.partition-count=2
从上面的配置中,我们可以看到增加了这两个参数:
- spring.cloud.stream.bindings.output.producer.partitionKeyExpression:通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
- spring.cloud.stream.bindings.output.producer.partitionCount:该参数指定了消息分区的数量。
- 当前的实例payload代表的我们定义的message信息,Spring Cloud Stream默认在获取到这个message后,通过hash的方法,计算 公式: key.hashCode() % partitionCount
到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,但需要注意的是要为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。
可以看到现在只有一个实例在消费消息(每隔两秒发送一个,之前分组轮询单个实例每四秒出现,现在两秒)
自定义消息分区规则
在上文的演示中,我们通过spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload实现了消息分区,什么意思呢,当我们配置了这个属性时,当前的实例payload代表的我们定义的message信息,Spring Cloud Stream默认在获取到这个message后,通过hash的方法,计算 公式:
key.hashCode() % partitionCount
计算出消息在哪个分区中,了解了这些,我们就可以做到了更多了。在Spring Cloud Stream中提供了实现 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy接口自定义分区策略,我们可以简单实现以下,同样的消息,我们随机的分区,但是确保分区分区的值在实例数量的范围之类,比如我有2个消费者实例,那么随机生成的分区的值在0到1之间:
/**
* 自定义分区策略
*/
public class MyPartitionKeyStrategy implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
Object payload = message.getPayload();
int i = payload.hashCode() % 2;
//payload策略
System.out.println("partition index: " + i);
int r = (int) (Math.random() * 2);
//自定义策略
System.out.println("random partition index: " + r);
return r;
}
}
通过配置类的方法,注入当前的分区策略:
/**
* 通过配置类的方法,注入分区策略
*/
@Configuration
public class MyStrategyConf {
@Bean
MyPartitionKeyStrategy myPartitionKeyStrategy(){
return new MyPartitionKeyStrategy();
}
}
修改生产者的配置,将partitionKeyExpression改成注入类的方法名称:
# 消息分组: 绑定主题
spring.cloud.stream.bindings.output.destination=MyGroup
spring.cloud.stream.bindings.output.producer.partition-key-expression=myPartitionKeyStrategy
spring.cloud.stream.bindings.output.producer.partition-count=2
发送消息
@Service
@EnableBinding({Source.class})
public class TestService {
@Autowired
private Source source;
public void test(){
User user = new User();
for (int i = 0; i < 10; i++) {
user.setAge(i+18);
user.setName("nike");
source.output().send(MessageBuilder.withPayload(user).build());
}
}
}
看到两个消费端收到的结果

注意:在自定义partition-key-expression后原来的payload策略就失效了
自定义主题测试
消费者-接收端 ```java public interface MySink {
String INPUT = “my-input”;
@Input(MySink.INPUT) SubscribableChannel input(); }
@EnableBinding({MySink.class}) @Slf4j public class SinkReceiver { @StreamListener(MySink.INPUT) public void myReceive(Object payload) { log.info(“My: Received: “ + payload); }
}
2. 消费者配置-添加配置
```java
# 这里组名可以更换
spring.cloud.stream.bindings.my-input.group=Service-A
spring.cloud.stream.bindings.my-input.destination=MyGroup
spring.cloud.stream.bindings.my-input.consumer.partitioned=true
spring.cloud.stream.bindings.my-input.binder=rabbit
spring.cloud.stream.instance-index=0
# 该参数指定了当前消费者的总实例数量 **必须,多个主题是共用的**
spring.cloud.stream.instance-count=2
发送端 ```java public interface MySource { String OUTPUT = “my-input”;
@Output(MySource.OUTPUT) MessageChannel output(); }
@Service @EnableBinding({MySource.class}) public class TestService {
@Autowired
private MySource source;
public void test(){
User user = new User();
for (int i = 0; i < 10; i++) {
user.setAge(i+18);
user.setName("nike");
source.output().send(MessageBuilder.withPayload(user).build());
}
}
}
4. 发送端添加配置
```java
# 该参数指定了当前消费者的总实例数量 **必须,多个主题是共用的**
spring.cloud.stream.bindings.my-input.producer.partition-count=2
spring.cloud.stream.bindings.my-input.destination=MyGroup
# 如果配置了策略,那么这里指定策略已经失效了
#spring.cloud.stream.bindings.my-input.producer.partition-key-expression=payload
配置多个PartitionKeyExtractorStrategy,如果不指定partition-key-extractor-name会报错
Caused by: java.lang.IllegalArgumentException: Multiple beans of type 'PartitionKeyExtractorStrategy' found.
{myPartitionKeyStrategy=com.snow.MyPartitionKeyStrategy@47c40b56, myPartitionKeyStrategy2=com.snow.MyPartitionKeyStrategy2@60b34931}.
Please use 'spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName' property to specify the name of the bean to be used.
指定为 spring.cloud.stream.bindings.my-input.producer.partition-key-extractor-name=myPartitionKeyStrategy2
成功进入自定义的分区策略
