1. 原理


image.png

2. 使用


1. 配置类 StreamRedisAppConfig

  1. @Configuration
  2. @Profile("stream")
  3. public class StreamRedisAppConfig {
  4. @Bean
  5. public LettuceConnectionFactory redisConnectionFactory() {
  6. System.out.println("使用单机版本");
  7. return new LettuceConnectionFactory(new RedisStandaloneConfiguration("127.0.0.1", 6379));
  8. }
  9. @Bean
  10. public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  11. RedisTemplate redisTemplate = new RedisTemplate();
  12. redisTemplate.setConnectionFactory(redisConnectionFactory);
  13. // 可以配置对象的转换规则,比如使用json格式对object进行存储。
  14. // Object --> 序列化 --> 二进制流 --> redis-server存储
  15. redisTemplate.setKeySerializer(new StringRedisSerializer());
  16. redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
  17. return redisTemplate;
  18. }
  19. }

2. 测试类 StreamTests

  1. @RunWith(SpringJUnit4ClassRunner.class)
  2. @ContextConfiguration("classpath:applicationContext.xml")
  3. @ActiveProfiles("stream")
  4. public class StreamTests {
  5. // stream 流,5.0新特性,redisTemplate、jedis还没有支持,Redisson和Lettuce支持了
  6. // 我们使用springboot中默认的redis客户端Lettuce
  7. // 添加: XADD mystream * sensor-id 1234 temperature 19.8
  8. // 遍历: XRANGE mystream - + COUNT 2
  9. // 消费:XREAD COUNT 2 STREAMS mystream 0
  10. // 阻塞式消费: XREAD BLOCK 0 STREAMS mystream $
  11. // 创建消费者组: XGROUP CREATE mystream mygroup $
  12. // 分组消费: XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
  13. // 消费确认: XACK mystream mygroup 1526569495631-0
  14. // 查看未确认的消息: XPENDING mystream mygroup - + 10
  15. // 重新认领消费:XCLAIM mystream mygroup Alice 3600000 1526569498055-0
  16. // XINFO 查看stream信息,监控
  17. @Test
  18. public void producer() {
  19. RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
  20. StatefulRedisConnection<String, String> connect = redisClient.connect();
  21. RedisCommands<String, String> redisSyncCommands = connect.sync();
  22. redisSyncCommands.xadd("stream_sms_send", "smsid", "10001", "content", "收到短信请回复");
  23. }
  24. // 普通消费 -- 最后一条消息
  25. @Test
  26. public void consumer() {
  27. RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
  28. StatefulRedisConnection<String, String> connect = redisClient.connect();
  29. RedisCommands<String, String> redisSyncCommands = connect.sync();
  30. List<StreamMessage<String, String>> streamSmsSend = redisSyncCommands.xread(XReadArgs.StreamOffset.from("stream_sms_send", "0"));
  31. for (StreamMessage<String, String> message : streamSmsSend) {
  32. System.out.println(message);
  33. }
  34. }
  35. @Test
  36. public void createGroup() {
  37. RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
  38. StatefulRedisConnection<String, String> connect = redisClient.connect();
  39. RedisCommands<String, String> redisSyncCommands = connect.sync();
  40. // 创建分组
  41. redisSyncCommands.xgroupCreate(XReadArgs.StreamOffset.from("stream_sms_send", "0"), "group_1");
  42. }
  43. @Test
  44. public void consumerGroup() {
  45. RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
  46. StatefulRedisConnection<String, String> connect = redisClient.connect();
  47. RedisCommands<String, String> redisSyncCommands = connect.sync();
  48. // 按组消费
  49. List<StreamMessage<String, String>> xReadGroup = redisSyncCommands.xreadgroup(Consumer.from("group_1", "consumer_1"), XReadArgs.StreamOffset.lastConsumed("stream_sms_send"));
  50. for (StreamMessage<String, String> message : xReadGroup) {
  51. System.out.println(message);
  52. // 告知 redis,消息已经完成了消费
  53. redisSyncCommands.xack("stream_sms_send", "group_1", message.getId());
  54. }
  55. }
  56. }