1. 配置类 PubsubRedisAppConfig
@Configuration@Profile("pubsub")public class PubsubRedisAppConfig { /** 用于测试的通道名称 */ public final static String TEST_CHANNEL_NAME = "sms_send"; @Bean public LettuceConnectionFactory redisConnectionFactory() { System.out.println("使用单机版本"); return new LettuceConnectionFactory(new RedisStandaloneConfiguration("127.0.0.1", 6379)); } @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 可以配置对象的转换规则,比如使用 json 格式对 object 进行存储 // object --> 序列化 --> 二进制流 --> redis-server 存储 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer()); return redisTemplate; }}
2. 监听通道消息 SmsChannelListener
/** * 接受消息通知,直接使用客户端的方式 */@Component@Profile("pubsub")public class SmsChannelListener { @Resource private RedisTemplate redisTemplate; @PostConstruct public void setup() { redisTemplate.execute(new RedisCallback() { @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { redisConnection.subscribe(((message, pattern) -> { System.out.println("收到消息,使用 redisTemplate 收到的:" + message); }), PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes()); return null; } }); }}
3. 使用 Spring 的方式接收消息通知 SmsChannelListenerBySpring
/** * 接收消息通知 */@Component@Profile("pubsub")@Configurationpublic class SmsChannelListenerBySpring { // 定义监听器 @Bean public RedisMessageListenerContainer smsMessageListener(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); SmsSendListener smsSendListener = new SmsSendListener(); container.addMessageListener(smsSendListener, Arrays.asList(new ChannelTopic(PubsubRedisAppConfig.TEST_CHANNEL_NAME))); return container; } // 定义触发的方法 class SmsSendListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { System.out.println("借助 spring 容器收到的消息:" + message); } }}
4. 测试类 PubsubTests
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("classpath:applicationContext.xml")@ActiveProfiles("pubsub")public class PubsubTests { @Resource private RedisTemplate redisTemplate; @Test public void test1() throws InterruptedException { System.out.println("开始测试发布订阅机制,5 秒后发布一条消息:"); Thread.sleep(5000); redisTemplate.execute(new RedisCallback() { @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { // 发送通知 Long received = redisConnection.publish(PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes(), "{手机号码10086~短信内容~~}".getBytes()); return received; } }); } // 隐藏功能 ~~ 黑科技~~当 key 删除时,或者 key 过期之后,也会有通知 @Test public void test2() throws InterruptedException { redisTemplate.execute(new RedisCallback() { @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { redisConnection.subscribe(((message, bytes) -> { System.out.println("收到消息,使用 redisTemplate 收到的:" + message); }), "__keyevent@0__:del".getBytes()); return null; } }); redisTemplate.opsForValue().set("hkkkk", "zp"); Thread.sleep(1000L); redisTemplate.delete("hkkkk"); }}