1. 配置类 PubsubRedisAppConfig
@Configuration
@Profile("pubsub")
public class PubsubRedisAppConfig {
/** 用于测试的通道名称 */
public final static String TEST_CHANNEL_NAME = "sms_send";
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
System.out.println("使用单机版本");
return new LettuceConnectionFactory(new RedisStandaloneConfiguration("127.0.0.1", 6379));
}
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 可以配置对象的转换规则,比如使用 json 格式对 object 进行存储
// object --> 序列化 --> 二进制流 --> redis-server 存储
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
return redisTemplate;
}
}
2. 监听通道消息 SmsChannelListener
/**
* 接受消息通知,直接使用客户端的方式
*/
@Component
@Profile("pubsub")
public class SmsChannelListener {
@Resource
private RedisTemplate redisTemplate;
@PostConstruct
public void setup() {
redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
redisConnection.subscribe(((message, pattern) -> {
System.out.println("收到消息,使用 redisTemplate 收到的:" + message);
}), PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes());
return null;
}
});
}
}
3. 使用 Spring 的方式接收消息通知 SmsChannelListenerBySpring
/**
* 接收消息通知
*/
@Component
@Profile("pubsub")
@Configuration
public class SmsChannelListenerBySpring {
// 定义监听器
@Bean
public RedisMessageListenerContainer smsMessageListener(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
SmsSendListener smsSendListener = new SmsSendListener();
container.addMessageListener(smsSendListener, Arrays.asList(new ChannelTopic(PubsubRedisAppConfig.TEST_CHANNEL_NAME)));
return container;
}
// 定义触发的方法
class SmsSendListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("借助 spring 容器收到的消息:" + message);
}
}
}
4. 测试类 PubsubTests
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
@ActiveProfiles("pubsub")
public class PubsubTests {
@Resource
private RedisTemplate redisTemplate;
@Test
public void test1() throws InterruptedException {
System.out.println("开始测试发布订阅机制,5 秒后发布一条消息:");
Thread.sleep(5000);
redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
// 发送通知
Long received = redisConnection.publish(PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes(), "{手机号码10086~短信内容~~}".getBytes());
return received;
}
});
}
// 隐藏功能 ~~ 黑科技~~当 key 删除时,或者 key 过期之后,也会有通知
@Test
public void test2() throws InterruptedException {
redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
redisConnection.subscribe(((message, bytes) -> {
System.out.println("收到消息,使用 redisTemplate 收到的:" + message);
}), "__keyevent@0__:del".getBytes());
return null;
}
});
redisTemplate.opsForValue().set("hkkkk", "zp");
Thread.sleep(1000L);
redisTemplate.delete("hkkkk");
}
}