Spring Cloud Stream知识点盘点_慕课手记

Stream - 图1

  • Binder 与消息中间件通信的组件
  • Binding是连接应用程序和消息中间件的,用于消息的消费和生产,由binder创建

    1. stream:
    2. rocketmq:
    3. binder:
    4. name-server: 127.0.0.1:9876
    5. bindings:
    6. output:
    7. producer:
    8. transactional: true
    9. group: tx-add-bonus-group
    10. bindings:
    11. output:
    12. # 用来指定topic
    13. destination: add-bonus


    默认信道 Source & Sink

  • DefaultSendService

  • DefaultReceiveService

CodeRepo/DefaultReceiveService.java at main · 4rnold/CodeRepo

生产端

Stream - 图2
@EnableBinding(Source.class)
Stream - 图3
发送消息,source就是@EnableBinding(Source.class)中的source。

消费端

@EnableBinding(Sink.class)
Stream - 图4
Stream - 图5

自定义信道

Stream - 图7

Stream - 图8

image.png

消费端

定义接口:
image.png
配置:
image.png
消费:
image.png

消费分区

  1. # 消息驱动的配置
  2. stream:
  3. # SpringCloud Stream + Kafka
  4. kafka:
  5. binder:
  6. brokers: 192.168.137.6:9092
  7. auto-create-topics: true # 如果设置为false, 就不会自动创建Topic, 你在使用之前需要手动创建好
  8. # SpringCloud Stream + RocketMQ
  9. # rocketmq:
  10. # binder:
  11. # name-server: 127.0.0.1:9876
  12. # 开启 stream 分区支持
  13. instanceCount: 1 # 消费者的总数
  14. instanceIndex: 0 # 当前消费者的索引
  15. bindings:
  16. # 默认发送方
  17. output: # 这里用 Stream 给我们提供的默认 output 信道
  18. destination: ecommerce-stream-client-default # 消息发往的目的地, Kafka 中就是 Topic
  19. content-type: text/plain # 消息发送的格式, 接收端不用指定格式, 但是发送端要
  20. # 消息分区
  21. producer:
  22. # partitionKeyExpression: payload.author # 分区关键字, payload 指的是发送的对象, author 是对象中的属性
  23. partitionCount: 1 # 分区大小
  24. # 使用自定义的分区策略, 注释掉 partitionKeyExpression
  25. partitionKeyExtractorName: qinyiPartitionKeyExtractorStrategy
  26. partitionSelectorName: qinyiPartitionSelectorStrategy
  27. # 默认接收方
  28. input: # 这里用 Stream 给我们提供的默认 input 信道
  29. destination: ecommerce-stream-client-default
  30. group: e-commerce-qinyi-default
  31. # 消费者开启分区支持
  32. consumer:
  33. partitioned: true
  • PartitionKeyExtractorStrategy:提取key的策略
  • PartitionSelectorStrategy:根据key决定去哪个分区

CodeRepo/QinyiPartitionSelectorStrategy.java at main · 4rnold/CodeRepo

消息过滤[不重要]

Spring Cloud Stream实现消息过滤消费_慕课手记

监控

image.png

错误处理

Spring Cloud Stream错误处理详解_慕课手记