pom配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>springcloud-demo</artifactId>
  7. <groupId>demo</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>demo-stream-rabbit-consumer</artifactId>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.cloud</groupId>
  19. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  20. </dependency>
  21. <!--sentinel-->
  22. <dependency>
  23. <groupId>com.alibaba.cloud</groupId>
  24. <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  25. </dependency>
  26. <!--健康监控-->
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-actuator</artifactId>
  30. </dependency>
  31. <dependency>
  32. <groupId>com.alibaba.cloud</groupId>
  33. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  34. </dependency>
  35. <dependency>
  36. <groupId>com.alibaba.cloud</groupId>
  37. <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  38. </dependency>
  39. <!-- mybatis 和SpringBoot 整合-->
  40. <dependency>
  41. <groupId>org.mybatis.spring.boot</groupId>
  42. <artifactId>mybatis-spring-boot-starter</artifactId>
  43. </dependency>
  44. <!-- MySQL 驱动 -->
  45. <dependency>
  46. <groupId>mysql</groupId>
  47. <artifactId>mysql-connector-java</artifactId>
  48. </dependency>
  49. <dependency>
  50. <groupId>com.alibaba</groupId>
  51. <artifactId>druid</artifactId>
  52. </dependency>
  53. <!-- jdbc -->
  54. <dependency>
  55. <groupId>org.springframework.boot</groupId>
  56. <artifactId>spring-boot-starter-jdbc</artifactId>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.springframework.boot</groupId>
  60. <artifactId>spring-boot-devtools</artifactId>
  61. <scope>runtime</scope>
  62. <optional>true</optional>
  63. </dependency>
  64. <dependency>
  65. <groupId>org.projectlombok</groupId>
  66. <artifactId>lombok</artifactId>
  67. </dependency>
  68. <dependency>
  69. <groupId>org.springframework.boot</groupId>
  70. <artifactId>spring-boot-starter-test</artifactId>
  71. <scope>test</scope>
  72. </dependency>
  73. <dependency>
  74. <groupId>junit</groupId>
  75. <artifactId>junit</artifactId>
  76. </dependency>
  77. </dependencies>
  78. </project>

yaml配置

  1. server:
  2. port: 8004
  3. spring:
  4. application:
  5. name: demo-stream-rabbit-consumer
  6. datasource:
  7. type: com.alibaba.druid.pool.DruidDataSource
  8. driver-class-name: com.mysql.jdbc.Driver
  9. url: jdbc:mysql://192.168.2.20:30569/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
  10. username: root
  11. password: root
  12. cloud:
  13. nacos:
  14. server-addr: 192.168.2.20:30010
  15. username: nacos
  16. password: nacos
  17. sentinel:
  18. transport:
  19. dashboard: 192.168.2.20:31596 # 控制台的安装位置
  20. # port: 8719 # 与sentinel单独连接的端口
  21. # client-ip: 192.168.2.6 # 本机的ip,如果sentinel装在虚拟机,必须配这个
  22. # port: 30195 # 与sentinel单独连接的端口
  23. # client-ip: 192.168.2.20 # 本机的ip,如果sentinel装在虚拟机,必须配这个
  24. stream:
  25. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  26. defaultRabbit: # 表示定义的名称,用于于binding整合
  27. type: rabbit # 消息组件类型
  28. # environment: # 设置rabbitmq的相关的环境配置
  29. # spring:
  30. # rabbitmq:
  31. # host: 192.168.2.20
  32. # port: 31672
  33. # username: user
  34. # password: ze2tgb2WGC
  35. bindings: # 服务的整合处理
  36. output: # 这个名字是一个通道的名称
  37. destination: studyExchange # 表示要使用的Exchange名称定义
  38. content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
  39. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  40. group: default-group
  41. input: # 这个名字是一个通道的名称
  42. destination: studyExchange # 表示要使用的Exchange名称定义
  43. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  44. group: default-group
  45. rabbitmq:
  46. host: 192.168.2.20
  47. port: 31672
  48. username: user
  49. password: ze2tgb2WGC
  50. # 暴露应用信息
  51. management:
  52. endpoints:
  53. web:
  54. exposure:
  55. include: '*'

示例:listener

  1. package com.test.rabbit.consumer.common.listener;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.annotation.StreamListener;
  5. import org.springframework.cloud.stream.messaging.Sink;
  6. import org.springframework.messaging.Message;
  7. /**
  8. * @author jia
  9. * @since 2022-04-05 12:27
  10. */
  11. @EnableBinding(Sink.class)
  12. @Slf4j
  13. public class ReceiveMessageListener {
  14. @StreamListener(Sink.INPUT)
  15. public void input(Message<String> message) {
  16. log.info("receive message is {}", message);
  17. }
  18. }

参考

https://github.com/spring-cloud/spring-cloud-stream-samples/