我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理。但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费。这时候我们就需要对消息进行分区处理。

使用消息分区

在Spring Cloud Stream中实现消息分区非常简单,我们可以根据消费组示例做一些配置修改就能实现,具体如下:

  • 在消费者应用SinkReceiver中,我们对配置文件做一些修改,具体如下: ```java

    消息分组(组内只有一个实例消费)

指定了该应用实例都属于Service-A消费组

spring.cloud.stream.bindings.input.group=Service-A

指定了输入通道对应的主题名

spring.cloud.stream.bindings.input.destination=MyGroup

消息分区(具备相同特征的消息由同一个实例进行消费)

通过该参数开启消费者分区功能

spring.cloud.stream.bindings.input.consumer.partitioned=true

参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount参数 - 1

spring.cloud.stream.instance-index=0

该参数指定了当前消费者的总实例数量

spring.cloud.stream.instance-count=2

  1. 1. spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区功能;
  2. 1. spring.cloud.stream.instanceCount:该参数指定了当前消费者的总实例数量;
  3. 1. spring.cloud.stream.instanceIndex:该参数设置当前实例的索引号,从0开始,最大值 instanceCount- 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。
  4. - 在生产者应用SinkSender中,我们对配置文件也做一些修改,具体如下:
  5. ```java
  6. # 消息分组: 绑定主题
  7. spring.cloud.stream.bindings.output.destination=MyGroup
  8. # 消息分区
  9. # 通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键
  10. # 指定分区键的表达式,payload表示获取消息后,进行hash取值计算出分区的值
  11. spring.cloud.stream.bindings.output.producer.partition-key-expression=payload
  12. # 该参数指定了消息分区的数量
  13. spring.cloud.stream.bindings.output.producer.partition-count=2

从上面的配置中,我们可以看到增加了这两个参数:

  1. spring.cloud.stream.bindings.output.producer.partitionKeyExpression:通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
  2. spring.cloud.stream.bindings.output.producer.partitionCount:该参数指定了消息分区的数量。
  3. 当前的实例payload代表的我们定义的message信息,Spring Cloud Stream默认在获取到这个message后,通过hash的方法,计算 公式: key.hashCode() % partitionCount

到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,但需要注意的是要为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。image.png
可以看到现在只有一个实例在消费消息(每隔两秒发送一个,之前分组轮询单个实例每四秒出现,现在两秒)

自定义消息分区规则

在上文的演示中,我们通过spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload实现了消息分区,什么意思呢,当我们配置了这个属性时,当前的实例payload代表的我们定义的message信息,Spring Cloud Stream默认在获取到这个message后,通过hash的方法,计算 公式:
key.hashCode() % partitionCount

 计算出消息在哪个分区中,了解了这些,我们就可以做到了更多了。在Spring Cloud Stream中提供了实现 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy接口自定义分区策略,我们可以简单实现以下,同样的消息,我们随机的分区,但是确保分区分区的值在实例数量的范围之类,比如我有2个消费者实例,那么随机生成的分区的值在0到1之间:

/**
 * 自定义分区策略
 */
public class MyPartitionKeyStrategy implements PartitionKeyExtractorStrategy {
    @Override
    public Object extractKey(Message<?> message) {
        Object payload = message.getPayload();
        int i = payload.hashCode() % 2;
        //payload策略
        System.out.println("partition index: " + i);

        int r = (int) (Math.random() * 2);
        //自定义策略
        System.out.println("random partition index: " + r);
        return r;
    }
}

通过配置类的方法,注入当前的分区策略:

/**
 * 通过配置类的方法,注入分区策略
 */
@Configuration
public class MyStrategyConf {

    @Bean
    MyPartitionKeyStrategy myPartitionKeyStrategy(){
        return new MyPartitionKeyStrategy();
    }
}

修改生产者的配置,将partitionKeyExpression改成注入类的方法名称:

# 消息分组: 绑定主题
spring.cloud.stream.bindings.output.destination=MyGroup
spring.cloud.stream.bindings.output.producer.partition-key-expression=myPartitionKeyStrategy
spring.cloud.stream.bindings.output.producer.partition-count=2

发送消息

@Service
@EnableBinding({Source.class})
public class TestService {

    @Autowired
    private Source source;
    public void test(){
        User user = new User();
        for (int i = 0; i < 10; i++) {
            user.setAge(i+18);
            user.setName("nike");
            source.output().send(MessageBuilder.withPayload(user).build());
        }
    }
}

看到两个消费端收到的结果
image.png
image.png
注意:在自定义partition-key-expression后原来的payload策略就失效了

自定义主题测试

  1. 消费者-接收端 ```java public interface MySink {

    String INPUT = “my-input”;

    @Input(MySink.INPUT) SubscribableChannel input(); }

@EnableBinding({MySink.class}) @Slf4j public class SinkReceiver { @StreamListener(MySink.INPUT) public void myReceive(Object payload) { log.info(“My: Received: “ + payload); }

}


2. 消费者配置-添加配置
```java
# 这里组名可以更换
spring.cloud.stream.bindings.my-input.group=Service-A
spring.cloud.stream.bindings.my-input.destination=MyGroup
spring.cloud.stream.bindings.my-input.consumer.partitioned=true
spring.cloud.stream.bindings.my-input.binder=rabbit

spring.cloud.stream.instance-index=0
# 该参数指定了当前消费者的总实例数量 **必须,多个主题是共用的**
spring.cloud.stream.instance-count=2
  1. 发送端 ```java public interface MySource { String OUTPUT = “my-input”;

    @Output(MySource.OUTPUT) MessageChannel output(); }

@Service @EnableBinding({MySource.class}) public class TestService {

@Autowired
private MySource source;
public void test(){
    User user = new User();
    for (int i = 0; i < 10; i++) {
        user.setAge(i+18);
        user.setName("nike");
        source.output().send(MessageBuilder.withPayload(user).build());
    }
}

}


4. 发送端添加配置
```java
# 该参数指定了当前消费者的总实例数量 **必须,多个主题是共用的**
spring.cloud.stream.bindings.my-input.producer.partition-count=2
spring.cloud.stream.bindings.my-input.destination=MyGroup
# 如果配置了策略,那么这里指定策略已经失效了
#spring.cloud.stream.bindings.my-input.producer.partition-key-expression=payload

配置多个PartitionKeyExtractorStrategy,如果不指定partition-key-extractor-name会报错
image.png

Caused by: java.lang.IllegalArgumentException: Multiple  beans of type 'PartitionKeyExtractorStrategy' found.
{myPartitionKeyStrategy=com.snow.MyPartitionKeyStrategy@47c40b56, myPartitionKeyStrategy2=com.snow.MyPartitionKeyStrategy2@60b34931}. 
    Please use 'spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName' property to specify the name of the bean to be used.

指定为 spring.cloud.stream.bindings.my-input.producer.partition-key-extractor-name=myPartitionKeyStrategy2
成功进入自定义的分区策略
image.png