1. 配置类 PubsubRedisAppConfig

  1. @Configuration
  2. @Profile("pubsub")
  3. public class PubsubRedisAppConfig {
  4. /** 用于测试的通道名称 */
  5. public final static String TEST_CHANNEL_NAME = "sms_send";
  6. @Bean
  7. public LettuceConnectionFactory redisConnectionFactory() {
  8. System.out.println("使用单机版本");
  9. return new LettuceConnectionFactory(new RedisStandaloneConfiguration("127.0.0.1", 6379));
  10. }
  11. @Bean
  12. public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  13. RedisTemplate redisTemplate = new RedisTemplate();
  14. redisTemplate.setConnectionFactory(redisConnectionFactory);
  15. // 可以配置对象的转换规则,比如使用 json 格式对 object 进行存储
  16. // object --> 序列化 --> 二进制流 --> redis-server 存储
  17. redisTemplate.setKeySerializer(new StringRedisSerializer());
  18. redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
  19. return redisTemplate;
  20. }
  21. }

2. 监听通道消息 SmsChannelListener

  1. /**
  2. * 接受消息通知,直接使用客户端的方式
  3. */
  4. @Component
  5. @Profile("pubsub")
  6. public class SmsChannelListener {
  7. @Resource
  8. private RedisTemplate redisTemplate;
  9. @PostConstruct
  10. public void setup() {
  11. redisTemplate.execute(new RedisCallback() {
  12. @Override
  13. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  14. redisConnection.subscribe(((message, pattern) -> {
  15. System.out.println("收到消息,使用 redisTemplate 收到的:" + message);
  16. }), PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes());
  17. return null;
  18. }
  19. });
  20. }
  21. }

3. 使用 Spring 的方式接收消息通知 SmsChannelListenerBySpring

  1. /**
  2. * 接收消息通知
  3. */
  4. @Component
  5. @Profile("pubsub")
  6. @Configuration
  7. public class SmsChannelListenerBySpring {
  8. // 定义监听器
  9. @Bean
  10. public RedisMessageListenerContainer smsMessageListener(RedisConnectionFactory redisConnectionFactory) {
  11. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  12. container.setConnectionFactory(redisConnectionFactory);
  13. SmsSendListener smsSendListener = new SmsSendListener();
  14. container.addMessageListener(smsSendListener, Arrays.asList(new ChannelTopic(PubsubRedisAppConfig.TEST_CHANNEL_NAME)));
  15. return container;
  16. }
  17. // 定义触发的方法
  18. class SmsSendListener implements MessageListener {
  19. @Override
  20. public void onMessage(Message message, byte[] bytes) {
  21. System.out.println("借助 spring 容器收到的消息:" + message);
  22. }
  23. }
  24. }

4. 测试类 PubsubTests

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration("classpath:applicationContext.xml")
  3. @ActiveProfiles("pubsub")
  4. public class PubsubTests {
  5. @Resource
  6. private RedisTemplate redisTemplate;
  7. @Test
  8. public void test1() throws InterruptedException {
  9. System.out.println("开始测试发布订阅机制,5 秒后发布一条消息:");
  10. Thread.sleep(5000);
  11. redisTemplate.execute(new RedisCallback() {
  12. @Override
  13. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  14. // 发送通知
  15. Long received = redisConnection.publish(PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes(), "{手机号码10086~短信内容~~}".getBytes());
  16. return received;
  17. }
  18. });
  19. }
  20. // 隐藏功能 ~~ 黑科技~~当 key 删除时,或者 key 过期之后,也会有通知
  21. @Test
  22. public void test2() throws InterruptedException {
  23. redisTemplate.execute(new RedisCallback() {
  24. @Override
  25. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  26. redisConnection.subscribe(((message, bytes) -> {
  27. System.out.println("收到消息,使用 redisTemplate 收到的:" + message);
  28. }), "__keyevent@0__:del".getBytes());
  29. return null;
  30. }
  31. });
  32. redisTemplate.opsForValue().set("hkkkk", "zp");
  33. Thread.sleep(1000L);
  34. redisTemplate.delete("hkkkk");
  35. }
  36. }