1. 原理
2. 使用
1. 配置类 StreamRedisAppConfig
@Configuration@Profile("stream")public class StreamRedisAppConfig { @Bean public LettuceConnectionFactory redisConnectionFactory() { System.out.println("使用单机版本"); return new LettuceConnectionFactory(new RedisStandaloneConfiguration("127.0.0.1", 6379)); } @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 可以配置对象的转换规则,比如使用json格式对object进行存储。 // Object --> 序列化 --> 二进制流 --> redis-server存储 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer()); return redisTemplate; }}
2. 测试类 StreamTests
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("classpath:applicationContext.xml")@ActiveProfiles("stream")public class StreamTests { // stream 流,5.0新特性,redisTemplate、jedis还没有支持,Redisson和Lettuce支持了 // 我们使用springboot中默认的redis客户端Lettuce // 添加: XADD mystream * sensor-id 1234 temperature 19.8 // 遍历: XRANGE mystream - + COUNT 2 // 消费:XREAD COUNT 2 STREAMS mystream 0 // 阻塞式消费: XREAD BLOCK 0 STREAMS mystream $ // 创建消费者组: XGROUP CREATE mystream mygroup $ // 分组消费: XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > // 消费确认: XACK mystream mygroup 1526569495631-0 // 查看未确认的消息: XPENDING mystream mygroup - + 10 // 重新认领消费:XCLAIM mystream mygroup Alice 3600000 1526569498055-0 // XINFO 查看stream信息,监控 @Test public void producer() { RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379"); StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> redisSyncCommands = connect.sync(); redisSyncCommands.xadd("stream_sms_send", "smsid", "10001", "content", "收到短信请回复"); } // 普通消费 -- 最后一条消息 @Test public void consumer() { RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379"); StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> redisSyncCommands = connect.sync(); List<StreamMessage<String, String>> streamSmsSend = redisSyncCommands.xread(XReadArgs.StreamOffset.from("stream_sms_send", "0")); for (StreamMessage<String, String> message : streamSmsSend) { System.out.println(message); } } @Test public void createGroup() { RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379"); StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> redisSyncCommands = connect.sync(); // 创建分组 redisSyncCommands.xgroupCreate(XReadArgs.StreamOffset.from("stream_sms_send", "0"), "group_1"); } @Test public void consumerGroup() { RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379"); StatefulRedisConnection<String, String> connect = redisClient.connect(); RedisCommands<String, String> redisSyncCommands = connect.sync(); // 按组消费 List<StreamMessage<String, String>> xReadGroup = redisSyncCommands.xreadgroup(Consumer.from("group_1", "consumer_1"), XReadArgs.StreamOffset.lastConsumed("stream_sms_send")); for (StreamMessage<String, String> message : xReadGroup) { System.out.println(message); // 告知 redis,消息已经完成了消费 redisSyncCommands.xack("stream_sms_send", "group_1", message.getId()); } }}