- Binder 与消息中间件通信的组件
Binding是连接应用程序和消息中间件的,用于消息的消费和生产,由binder创建。
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
transactional: true
group: tx-add-bonus-group
bindings:
output:
# 用来指定topic
destination: add-bonus
默认信道 Source & Sink
DefaultSendService
- DefaultReceiveService
CodeRepo/DefaultReceiveService.java at main · 4rnold/CodeRepo
生产端
@EnableBinding(Source.class)
发送消息,source就是@EnableBinding(Source.class)中的source。
消费端
自定义信道
消费端
消费分区
# 消息驱动的配置
stream:
# SpringCloud Stream + Kafka
kafka:
binder:
brokers: 192.168.137.6:9092
auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
# SpringCloud Stream + RocketMQ
# rocketmq:
# binder:
# name-server: 127.0.0.1:9876
# 开启 stream 分区支持
instanceCount: 1 # 消费者的总数
instanceIndex: 0 # 当前消费者的索引
bindings:
# 默认发送方
output: # 这里用 Stream 给我们提供的默认 output 信道
destination: ecommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
# 消息分区
producer:
# partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
partitionCount: 1 # 分区大小
# 使用自定义的分区策略, 注释掉 partitionKeyExpression
partitionKeyExtractorName: qinyiPartitionKeyExtractorStrategy
partitionSelectorName: qinyiPartitionSelectorStrategy
# 默认接收方
input: # 这里用 Stream 给我们提供的默认 input 信道
destination: ecommerce-stream-client-default
group: e-commerce-qinyi-default
# 消费者开启分区支持
consumer:
partitioned: true
- PartitionKeyExtractorStrategy:提取key的策略
- PartitionSelectorStrategy:根据key决定去哪个分区
CodeRepo/QinyiPartitionSelectorStrategy.java at main · 4rnold/CodeRepo
消息过滤[不重要]
Spring Cloud Stream实现消息过滤消费_慕课手记