https://www.cnblogs.com/throwable/p/11601538.html

前提#

Lettuce是一个RedisJava驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-data-redis的驱动包在某个版本之后替换为LettuceLettuce翻译为生菜,没错,就是吃的那种生菜,所以它的Logo长这样:

Redis高级客户端Lettuce详解 - throwable - 博客园 - 图1

既然能被Spring生态所认可,Lettuce想必有过人之处,于是笔者花时间阅读她的官方文档,整理测试示例,写下这篇文章。编写本文时所使用的版本为Lettuce 5.1.8.RELEASESpringBoot 2.1.8.RELEASEJDK [8,11]。超长警告:这篇文章断断续续花了两周完成,超过 4 万字…..

Lettuce 简介#

Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供天然的反应式编程,通信框架集成了Netty使用了非阻塞IO5.x版本之后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API5.1版本的新特性如下:

  • 支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX
  • 支持通过Brave模块跟踪Redis命令执行。
  • 支持Redis Streams
  • 支持异步的主从连接。
  • 支持异步连接池。
  • 新增命令最多执行一次模式(禁止自动重连)。
  • 全局命令超时设置(对异步和反应式命令也有效)。
  • …… 等等

注意一点Redis的版本至少需要2.6,当然越高越好,API的兼容性比较强大。

只需要引入单个依赖就可以开始愉快地使用Lettuce

  • Maven
  1. <dependency>
  2. <groupId>io.lettuce</groupId>
  3. <artifactId>lettuce-core</artifactId>
  4. <version>5.1.8.RELEASE</version>
  5. </dependency>
  • Gradle
  1. dependencies {
  2. compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
  3. }

连接 Redis#

单机、哨兵、集群模式下连接Redis需要一个统一的标准去表示连接的细节信息,在Lettuce中这个统一的标准是RedisURI。可以通过三种方式构造一个RedisURI实例:

  • 定制的字符串URI语法:
  1. RedisURI uri = RedisURI.create("redis://localhost/");
  • 使用建造器(RedisURI.Builder):
  1. RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  • 直接通过构造函数实例化:
  1. RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

定制的连接 URI 语法#

  • 单机(前缀为redis://
  1. 格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
  2. 完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s
  3. 简单:redis://localhost
  • 单机并且使用SSL(前缀为rediss://) <== 注意后面多了个s
  1. 格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
  2. 完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s
  3. 简单:rediss://localhost
  • 单机Unix Domain Sockets模式(前缀为redis-socket://
  1. 格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
  2. 完整:redis-socket:///tmp/redis?timeout=10s&_database=0
  • 哨兵(前缀为redis-sentinel://
  1. 格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
  2. 完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster

超时时间单位:

  • d 天
  • h 小时
  • m 分钟
  • s 秒钟
  • ms 毫秒
  • us 微秒
  • ns 纳秒

个人建议使用RedisURI提供的建造器,毕竟定制的URI虽然简洁,但是比较容易出现人为错误。鉴于笔者没有SSLUnix Domain Socket的使用场景,下面不对这两种连接方式进行列举。

基本使用#

Lettuce使用的时候依赖于四个主要组件:

  • RedisURI:连接信息。
  • RedisClientRedis客户端,特殊地,集群连接有一个定制的RedisClusterClient
  • ConnectionRedis连接,主要是StatefulConnection或者StatefulRedisConnection的子类,连接的类型主要由连接的具体方式(单机、哨兵、集群、订阅发布等等)选定,比较重要。
  • RedisCommandsRedis命令API接口,基本上覆盖了Redis发行版本的所有命令,提供了同步(sync)、异步(async)、反应式(reative)的调用方式,对于使用者而言,会经常跟RedisCommands系列接口打交道。

一个基本使用例子如下:

  1. @Test
  2. public void testSetGet() throws Exception {
  3. RedisURI redisUri = RedisURI.builder()
  4. .withHost("localhost")
  5. .withPort(6379)
  6. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
  7. .build();
  8. RedisClient redisClient = RedisClient.create(redisUri);
  9. StatefulRedisConnection<String, String> connection = redisClient.connect();
  10. RedisCommands<String, String> redisCommands = connection.sync();
  11. SetArgs setArgs = SetArgs.Builder.nx().ex(5);
  12. String result = redisCommands.set("name", "throwable", setArgs);
  13. Assertions.assertThat(result).isEqualToIgnoringCase("OK");
  14. result = redisCommands.get("name");
  15. Assertions.assertThat(result).isEqualTo("throwable");
  16. connection.close();
  17. redisClient.shutdown();
  18. }

注意:

  • <5>:关闭连接一般在应用程序停止之前操作,一个应用程序中的一个Redis驱动实例不需要太多的连接(一般情况下只需要一个连接实例就可以,如果有多个连接的需要可以考虑使用连接池,其实Redis目前处理命令的模块是单线程,在客户端多个连接多线程调用理论上没有效果)。
  • <6>:关闭客户端一般应用程序停止之前操作,如果条件允许的话,基于后开先闭原则,客户端关闭应该在连接关闭之后操作。

API#

Lettuce主要提供三种API

  • 同步(sync):RedisCommands
  • 异步(async):RedisAsyncCommands
  • 反应式(reactive):RedisReactiveCommands

先准备好一个单机Redis连接备用:

  1. private static StatefulRedisConnection<String, String> CONNECTION;
  2. private static RedisClient CLIENT;
  3. @BeforeClass
  4. public static void beforeClass() {
  5. RedisURI redisUri = RedisURI.builder()
  6. .withHost("localhost")
  7. .withPort(6379)
  8. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
  9. .build();
  10. CLIENT = RedisClient.create(redisUri);
  11. CONNECTION = CLIENT.connect();
  12. }
  13. @AfterClass
  14. public static void afterClass() throws Exception {
  15. CONNECTION.close();
  16. CLIENT.shutdown();
  17. }

Redis命令API的具体实现可以直接从StatefulRedisConnection实例获取,见其接口定义:

  1. public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {
  2. boolean isMulti();
  3. RedisCommands<K, V> sync();
  4. RedisAsyncCommands<K, V> async();
  5. RedisReactiveCommands<K, V> reactive();
  6. }

值得注意的是,在不指定编码解码器RedisCodec的前提下,RedisClient创建的StatefulRedisConnection实例一般是泛型实例StatefulRedisConnection<String,String>,也就是所有命令APIKEYVALUE都是String类型,这种使用方式能满足大部分的使用场景。当然,必要的时候可以定制编码解码器RedisCodec<K,V>

同步 API#

先构建RedisCommands实例:

  1. private static RedisCommands<String, String> COMMAND;
  2. @BeforeClass
  3. public static void beforeClass() {
  4. COMMAND = CONNECTION.sync();
  5. }

基本使用:

  1. @Test
  2. public void testSyncPing() throws Exception {
  3. String pong = COMMAND.ping();
  4. Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
  5. }
  6. @Test
  7. public void testSyncSetAndGet() throws Exception {
  8. SetArgs setArgs = SetArgs.Builder.nx().ex(5);
  9. COMMAND.set("name", "throwable", setArgs);
  10. String value = COMMAND.get("name");
  11. log.info("Get value: {}", value);
  12. }

同步API在所有命令调用之后会立即返回结果。如果熟悉Jedis的话,RedisCommands的用法其实和它相差不大。

异步 API#

先构建RedisAsyncCommands实例:

  1. private static RedisAsyncCommands<String, String> ASYNC_COMMAND;
  2. @BeforeClass
  3. public static void beforeClass() {
  4. ASYNC_COMMAND = CONNECTION.async();
  5. }

基本使用:

  1. @Test
  2. public void testAsyncPing() throws Exception {
  3. RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();
  4. log.info("Ping result:{}", redisFuture.get());
  5. }

RedisAsyncCommands所有方法执行返回结果都是RedisFuture实例,而RedisFuture接口的定义如下:

  1. public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {
  2. String getError();
  3. boolean await(long timeout, TimeUnit unit) throws InterruptedException;
  4. }

也就是,RedisFuture可以无缝使用Future或者JDK1.8 中引入的CompletableFuture提供的方法。举个例子:

  1. @Test
  2. public void testAsyncSetAndGet1() throws Exception {
  3. SetArgs setArgs = SetArgs.Builder.nx().ex(5);
  4. RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);
  5. future.thenAccept(value -> log.info("Set命令返回:{}", value));
  6. future.get();
  7. }
  8. @Test
  9. public void testAsyncSetAndGet2() throws Exception {
  10. SetArgs setArgs = SetArgs.Builder.nx().ex(5);
  11. CompletableFuture<Void> result =
  12. (CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs)
  13. .thenAcceptBoth(ASYNC_COMMAND.get("name"),
  14. (s, g) -> {
  15. log.info("Set命令返回:{}", s);
  16. log.info("Get命令返回:{}", g);
  17. });
  18. result.get();
  19. }

如果能熟练使用CompletableFuture和函数式编程技巧,可以组合多个RedisFuture完成一些列复杂的操作。

反应式 API#

Lettuce引入的反应式编程框架是Project Reactor,如果没有反应式编程经验可以先自行了解一下Project Reactor

构建RedisReactiveCommands实例:

  1. private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;
  2. @BeforeClass
  3. public static void beforeClass() {
  4. REACTIVE_COMMAND = CONNECTION.reactive();
  5. }

根据Project ReactorRedisReactiveCommands的方法如果返回的结果只包含 0 或 1 个元素,那么返回值类型是Mono,如果返回的结果包含 0 到 N(N 大于 0)个元素,那么返回值是Flux。举个例子:

  1. @Test
  2. public void testReactivePing() throws Exception {
  3. Mono<String> ping = REACTIVE_COMMAND.ping();
  4. ping.subscribe(v -> log.info("Ping result:{}", v));
  5. Thread.sleep(1000);
  6. }
  7. @Test
  8. public void testReactiveSetAndGet() throws Exception {
  9. SetArgs setArgs = SetArgs.Builder.nx().ex(5);
  10. REACTIVE_COMMAND.set("name", "throwable", setArgs).block();
  11. REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));
  12. Thread.sleep(1000);
  13. }
  14. @Test
  15. public void testReactiveSet() throws Exception {
  16. REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();
  17. Flux<String> flux = REACTIVE_COMMAND.smembers("food");
  18. flux.subscribe(log::info);
  19. REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();
  20. Thread.sleep(1000);
  21. }

举个更加复杂的例子,包含了事务、函数转换等:

  1. @Test
  2. public void testReactiveFunctional() throws Exception {
  3. REACTIVE_COMMAND.multi().doOnSuccess(r -> {
  4. REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();
  5. REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();
  6. }).flatMap(s -> REACTIVE_COMMAND.exec())
  7. .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded()))
  8. .subscribe();
  9. Thread.sleep(1000);
  10. }

这个方法开启一个事务,先把counter设置为 1,再将counter自增 1。

发布和订阅#

非集群模式下的发布订阅依赖于定制的连接StatefulRedisPubSubConnection,集群模式下的发布订阅依赖于定制的连接StatefulRedisClusterPubSubConnection,两者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub()

  • 非集群模式:
  1. RedisClient client = ...
  2. StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
  3. connection.addListener(new RedisPubSubListener<String, String>() { ... });
  4. RedisPubSubCommands<String, String> sync = connection.sync();
  5. sync.subscribe("channel");
  6. RedisPubSubAsyncCommands<String, String> async = connection.async();
  7. RedisFuture<Void> future = async.subscribe("channel");
  8. RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
  9. reactive.subscribe("channel").subscribe();
  10. reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
  • 集群模式:
  1. RedisClusterClient clusterClient = ...
  2. StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
  3. connection.addListener(new RedisPubSubListener<String, String>() { ... });
  4. RedisPubSubCommands<String, String> sync = connection.sync();
  5. sync.subscribe("channel");

这里用单机同步命令的模式举一个Redis键空间通知(Redis Keyspace Notifications)的例子:

  1. @Test
  2. public void testSyncKeyspaceNotification() throws Exception {
  3. RedisURI redisUri = RedisURI.builder()
  4. .withHost("localhost")
  5. .withPort(6379)
  6. .withDatabase(0)
  7. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
  8. .build();
  9. RedisClient redisClient = RedisClient.create(redisUri);
  10. StatefulRedisConnection<String, String> redisConnection = redisClient.connect();
  11. RedisCommands<String, String> redisCommands = redisConnection.sync();
  12. redisCommands.configSet("notify-keyspace-events", "Ex");
  13. StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
  14. connection.addListener(new RedisPubSubAdapter<>() {
  15. @Override
  16. public void psubscribed(String pattern, long count) {
  17. log.info("pattern:{},count:{}", pattern, count);
  18. }
  19. @Override
  20. public void message(String pattern, String channel, String message) {
  21. log.info("pattern:{},channel:{},message:{}", pattern, channel, message);
  22. }
  23. });
  24. RedisPubSubCommands<String, String> commands = connection.sync();
  25. commands.psubscribe("__keyevent@0__:expired");
  26. redisCommands.setex("name", 2, "throwable");
  27. Thread.sleep(10000);
  28. redisConnection.close();
  29. connection.close();
  30. redisClient.shutdown();
  31. }

实际上,在实现RedisPubSubListener的时候可以单独抽离,尽量不要设计成匿名内部类的形式。

事务和批量命令执行#

事务相关的命令就是WATCHUNWATCHEXECMULTIDISCARD,在RedisCommands系列接口中有对应的方法。举个例子:

  1. @Test
  2. public void testSyncMulti() throws Exception {
  3. COMMAND.multi();
  4. COMMAND.setex("name-1", 2, "throwable");
  5. COMMAND.setex("name-2", 2, "doge");
  6. TransactionResult result = COMMAND.exec();
  7. int index = 0;
  8. for (Object r : result) {
  9. log.info("Result-{}:{}", index, r);
  10. index++;
  11. }
  12. }

RedisPipeline也就是管道机制可以理解为把多个命令打包在一次请求发送到Redis服务端,然后Redis服务端把所有的响应结果打包好一次性返回,从而节省不必要的网络资源(最主要是减少网络请求次数)。Redis对于Pipeline机制如何实现并没有明确的规定,也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO(阻塞 IO)通讯,所以它的做法是客户端缓存将要发送的命令,最后需要触发然后同步发送一个巨大的命令列表包,再接收和解析一个巨大的响应列表包。PipelineLettuce中对使用者是透明的,由于底层的通讯框架是Netty,所以网络通讯层面的优化Lettuce不需要过多干预,换言之可以这样理解:NettyLettuce从底层实现了RedisPipeline机制。但是,Lettuce的异步API也提供了手动Flush的方法:

  1. @Test
  2. public void testAsyncManualFlush() {
  3. ASYNC_COMMAND.setAutoFlushCommands(false);
  4. List<RedisFuture<?>> redisFutures = Lists.newArrayList();
  5. int count = 5000;
  6. for (int i = 0; i < count; i++) {
  7. String key = "key-" + (i + 1);
  8. String value = "value-" + (i + 1);
  9. redisFutures.add(ASYNC_COMMAND.set(key, value));
  10. redisFutures.add(ASYNC_COMMAND.expire(key, 2));
  11. }
  12. long start = System.currentTimeMillis();
  13. ASYNC_COMMAND.flushCommands();
  14. boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));
  15. Assertions.assertThat(result).isTrue();
  16. log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
  17. }

上面只是从文档看到的一些理论术语,但是现实是骨感的,对比了下JedisPipeline提供的方法,发现了JedisPipeline执行耗时比较低:

  1. @Test
  2. public void testJedisPipeline() throws Exception {
  3. Jedis jedis = new Jedis();
  4. Pipeline pipeline = jedis.pipelined();
  5. int count = 5000;
  6. for (int i = 0; i < count; i++) {
  7. String key = "key-" + (i + 1);
  8. String value = "value-" + (i + 1);
  9. pipeline.set(key, value);
  10. pipeline.expire(key, 2);
  11. }
  12. long start = System.currentTimeMillis();
  13. pipeline.syncAndReturnAll();
  14. log.info("Jedis cost:{} ms", System.currentTimeMillis() - start);
  15. }

个人猜测Lettuce可能底层并非合并所有命令一次发送(甚至可能是单条发送),具体可能需要抓包才能定位。依此来看,如果真的有大量执行Redis命令的场景,不妨可以使用JedisPipeline

注意:由上面的测试推断RedisTemplateexecutePipelined()方法是假的Pipeline执行方法,使用RedisTemplate的时候请务必注意这一点。

Lua 脚本执行#

Lettuce中执行RedisLua命令的同步接口如下:

  1. public interface RedisScriptingCommands<K, V> {
  2. <T> T eval(String var1, ScriptOutputType var2, K... var3);
  3. <T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);
  4. <T> T evalsha(String var1, ScriptOutputType var2, K... var3);
  5. <T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);
  6. List<Boolean> scriptExists(String... var1);
  7. String scriptFlush();
  8. String scriptKill();
  9. String scriptLoad(V var1);
  10. String digest(V var1);
  11. }

异步和反应式的接口方法定义差不多,不同的地方就是返回值类型,一般我们常用的是eval()evalsha()scriptLoad()方法。举个简单的例子:

  1. private static RedisCommands<String, String> COMMANDS;
  2. private static String RAW_LUA = "local key = KEYS[1]\n" +
  3. "local value = ARGV[1]\n" +
  4. "local timeout = ARGV[2]\n" +
  5. "redis.call('SETEX', key, tonumber(timeout), value)\n" +
  6. "local result = redis.call('GET', key)\n" +
  7. "return result;";
  8. private static AtomicReference<String> LUA_SHA = new AtomicReference<>();
  9. @Test
  10. public void testLua() throws Exception {
  11. LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));
  12. String[] keys = new String[]{"name"};
  13. String[] args = new String[]{"throwable", "5000"};
  14. String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);
  15. log.info("Get value:{}", result);
  16. }

高可用和分片#

为了Redis的高可用,一般会采用普通主从(Master/Replica,这里笔者称为普通主从模式,也就是仅仅做了主从复制,故障需要手动切换)、哨兵和集群。普通主从模式可以独立运行,也可以配合哨兵运行,只是哨兵提供自动故障转移和主节点提升功能。普通主从和哨兵都可以使用MasterSlave,通过入参包括RedisClient、编码解码器以及一个或者多个RedisURI获取对应的Connection实例。

这里注意一点MasterSlave中提供的方法如果只要求传入一个RedisURI实例,那么Lettuce会进行拓扑发现机制,自动获取Redis主从节点信息;如果要求传入一个RedisURI集合,那么对于普通主从模式来说所有节点信息是静态的,不会进行发现和更新。

拓扑发现的规则如下:

  • 对于普通主从(Master/Replica)模式,不需要感知RedisURI指向从节点还是主节点,只会进行一次性的拓扑查找所有节点信息,此后节点信息会保存在静态缓存中,不会更新。
  • 对于哨兵模式,会订阅所有哨兵实例并侦听订阅 / 发布消息以触发拓扑刷新机制,更新缓存的节点信息,也就是哨兵天然就是动态发现节点信息,不支持静态配置。

拓扑发现机制的提供APITopologyProvider,需要了解其原理的可以参考具体的实现。

对于集群(Cluster)模式,Lettuce提供了一套独立的API

另外,如果Lettuce连接面向的是非单个Redis节点,连接实例提供了数据读取节点偏好ReadFrom)设置,可选值有:

  • MASTER:只从Master节点中读取。
  • MASTER_PREFERRED:优先从Master节点中读取。
  • SLAVE_PREFERRED:优先从Slavor节点中读取。
  • SLAVE:只从Slavor节点中读取。
  • NEAREST:使用最近一次连接的Redis实例读取。

普通主从模式#

假设现在有三个Redis服务形成树状主从关系如下:

  • 节点一:localhost:6379,角色为 Master。
  • 节点二:localhost:6380,角色为 Slavor,节点一的从节点。
  • 节点三:localhost:6381,角色为 Slavor,节点二的从节点。

首次动态节点发现主从模式的节点信息需要如下构建连接:

  1. @Test
  2. public void testDynamicReplica() throws Exception {
  3. RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  4. RedisClient redisClient = RedisClient.create(uri);
  5. StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);
  6. connection.setReadFrom(ReadFrom.SLAVE);
  7. connection.close();
  8. redisClient.shutdown();
  9. }

如果需要指定静态的Redis主从节点连接属性,那么可以这样构建连接:

  1. @Test
  2. public void testStaticReplica() throws Exception {
  3. List<RedisURI> uris = new ArrayList<>();
  4. RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();
  5. RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();
  6. RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();
  7. uris.add(uri1);
  8. uris.add(uri2);
  9. uris.add(uri3);
  10. RedisClient redisClient = RedisClient.create();
  11. StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,
  12. new Utf8StringCodec(), uris);
  13. connection.setReadFrom(ReadFrom.MASTER);
  14. connection.close();
  15. redisClient.shutdown();
  16. }

哨兵模式#

由于Lettuce自身提供了哨兵的拓扑发现机制,所以只需要随便配置一个哨兵节点的RedisURI实例即可:

  1. @Test
  2. public void testDynamicSentinel() throws Exception {
  3. RedisURI redisUri = RedisURI.builder()
  4. .withPassword("你的密码")
  5. .withSentinel("localhost", 26379)
  6. .withSentinelMasterId("哨兵Master的ID")
  7. .build();
  8. RedisClient redisClient = RedisClient.create();
  9. StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);
  10. connection.setReadFrom(ReadFrom.SLAVE);
  11. RedisCommands<String, String> command = connection.sync();
  12. SetArgs setArgs = SetArgs.Builder.nx().ex(5);
  13. command.set("name", "throwable", setArgs);
  14. String value = command.get("name");
  15. log.info("Get value:{}", value);
  16. }

集群模式#

鉴于笔者对Redis集群模式并不熟悉,Cluster模式下的API使用本身就有比较多的限制,所以这里只简单介绍一下怎么用。先说几个特性:

下面的 API 提供跨槽位(Slot)调用的功能

  • RedisAdvancedClusterCommands
  • RedisAdvancedClusterAsyncCommands
  • RedisAdvancedClusterReactiveCommands

静态节点选择功能:

  • masters:选择所有主节点执行命令。
  • slaves:选择所有从节点执行命令,其实就是只读模式。
  • all nodes:命令可以在所有节点执行。

集群拓扑视图动态更新功能:

  • 手动更新,主动调用RedisClusterClient#reloadPartitions()
  • 后台定时更新。
  • 自适应更新,基于连接断开和MOVED/ASK命令重定向自动更新。

Redis集群搭建详细过程可以参考官方文档,假设已经搭建好集群如下(192.168.56.200是笔者的虚拟机 Host):

  • 192.168.56.200:7001 => 主节点,槽位 0-5460。
  • 192.168.56.200:7002 => 主节点,槽位 5461-10922。
  • 192.168.56.200:7003 => 主节点,槽位 10923-16383。
  • 192.168.56.200:7004 => 7001 的从节点。
  • 192.168.56.200:7005 => 7002 的从节点。
  • 192.168.56.200:7006 => 7003 的从节点。

简单的集群连接和使用方式如下:

  1. @Test
  2. public void testSyncCluster(){
  3. RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();
  4. RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
  5. StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
  6. RedisAdvancedClusterCommands<String, String> commands = connection.sync();
  7. commands.setex("name",10, "throwable");
  8. String value = commands.get("name");
  9. log.info("Get value:{}", value);
  10. }

节点选择:

  1. @Test
  2. public void testSyncNodeSelection() {
  3. RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
  4. RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
  5. StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
  6. RedisAdvancedClusterCommands<String, String> commands = connection.sync();
  7. NodeSelection<String, String> replicas = commands.slaves();
  8. NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();
  9. Executions<List<String>> keys = nodeSelectionCommands.keys("*");
  10. keys.forEach(key -> log.info("key: {}", key));
  11. connection.close();
  12. redisClusterClient.shutdown();
  13. }

定时更新集群拓扑视图(每隔十分钟更新一次,这个时间自行考量,不能太频繁):

  1. @Test
  2. public void testPeriodicClusterTopology() throws Exception {
  3. RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
  4. RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
  5. ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions
  6. .builder()
  7. .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES))
  8. .build();
  9. redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
  10. StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
  11. RedisAdvancedClusterCommands<String, String> commands = connection.sync();
  12. commands.setex("name", 10, "throwable");
  13. String value = commands.get("name");
  14. log.info("Get value:{}", value);
  15. Thread.sleep(Integer.MAX_VALUE);
  16. connection.close();
  17. redisClusterClient.shutdown();
  18. }

自适应更新集群拓扑视图:

  1. @Test
  2. public void testAdaptiveClusterTopology() throws Exception {
  3. RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();
  4. RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);
  5. ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
  6. .enableAdaptiveRefreshTrigger(
  7. ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
  8. ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
  9. )
  10. .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS))
  11. .build();
  12. redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());
  13. StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
  14. RedisAdvancedClusterCommands<String, String> commands = connection.sync();
  15. commands.setex("name", 10, "throwable");
  16. String value = commands.get("name");
  17. log.info("Get value:{}", value);
  18. Thread.sleep(Integer.MAX_VALUE);
  19. connection.close();
  20. redisClusterClient.shutdown();
  21. }

动态命令和自定义命令#

自定义命令是Redis命令有限集,不过可以更细粒度指定KEYARGV、命令类型、编码解码器和返回值类型,依赖于dispatch()方法:

  1. @Test
  2. public void testCustomPing() throws Exception {
  3. RedisURI redisUri = RedisURI.builder()
  4. .withHost("localhost")
  5. .withPort(6379)
  6. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
  7. .build();
  8. RedisClient redisClient = RedisClient.create(redisUri);
  9. StatefulRedisConnection<String, String> connect = redisClient.connect();
  10. RedisCommands<String, String> sync = connect.sync();
  11. RedisCodec<String, String> codec = StringCodec.UTF8;
  12. String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));
  13. log.info("PING:{}", result);
  14. connect.close();
  15. redisClient.shutdown();
  16. }
  17. @Test
  18. public void testCustomSet() throws Exception {
  19. RedisURI redisUri = RedisURI.builder()
  20. .withHost("localhost")
  21. .withPort(6379)
  22. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
  23. .build();
  24. RedisClient redisClient = RedisClient.create(redisUri);
  25. StatefulRedisConnection<String, String> connect = redisClient.connect();
  26. RedisCommands<String, String> sync = connect.sync();
  27. RedisCodec<String, String> codec = StringCodec.UTF8;
  28. sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),
  29. new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));
  30. String result = sync.get("name");
  31. log.info("Get value:{}", result);
  32. connect.close();
  33. redisClient.shutdown();
  34. }

动态命令是基于Redis命令有限集,并且通过注解和动态代理完成一些复杂命令组合的实现。主要注解在io.lettuce.core.dynamic.annotation包路径下。简单举个例子:

  1. public interface CustomCommand extends Commands {
  2. @Command("SET ?0 ?1")
  3. String setKey(String key, String value);
  4. @Command("SET :key :value")
  5. String setKeyNamed(@Param("key") String key, @Param("value") String value);
  6. @Command("MGET ?0 ?1")
  7. List<String> mGet(String key1, String key2);
  8. @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)
  9. String mSet(String key1, String value1, String key2, String value2);
  10. }
  11. @Test
  12. public void testCustomDynamicSet() throws Exception {
  13. RedisURI redisUri = RedisURI.builder()
  14. .withHost("localhost")
  15. .withPort(6379)
  16. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
  17. .build();
  18. RedisClient redisClient = RedisClient.create(redisUri);
  19. StatefulRedisConnection<String, String> connect = redisClient.connect();
  20. RedisCommandFactory commandFactory = new RedisCommandFactory(connect);
  21. CustomCommand commands = commandFactory.getCommands(CustomCommand.class);
  22. commands.setKey("name", "throwable");
  23. commands.setKeyNamed("throwable", "doge");
  24. log.info("MGET ===> " + commands.mGet("name", "throwable"));
  25. commands.mSet("key1", "value1","key2", "value2");
  26. log.info("MGET ===> " + commands.mGet("key1", "key2"));
  27. connect.close();
  28. redisClient.shutdown();
  29. }

高阶特性#

Lettuce有很多高阶使用特性,这里只列举个人认为常用的两点:

  • 配置客户端资源。
  • 使用连接池。

更多其他特性可以自行参看官方文档。

配置客户端资源#

客户端资源的设置与Lettuce的性能、并发和事件处理相关。线程池或者线程组相关配置占据客户端资源配置的大部分(EventLoopGroupsEventExecutorGroup),这些线程池或者线程组是连接程序的基础组件。一般情况下,客户端资源应该在多个Redis客户端之间共享,并且在不再使用的时候需要自行关闭。笔者认为,客户端资源是面向Netty的。注意:除非特别熟悉或者花长时间去测试调整下面提到的参数,否则在没有经验的前提下凭直觉修改默认值,有可能会踩坑。

客户端资源接口是ClientResources,实现类是DefaultClientResources

构建DefaultClientResources实例:

  1. ClientResources resources = DefaultClientResources.create();
  2. ClientResources resources = DefaultClientResources.builder()
  3. .ioThreadPoolSize(4)
  4. .computationThreadPoolSize(4)
  5. .build()

使用:

  1. ClientResources resources = DefaultClientResources.create();
  2. RedisClient client = RedisClient.create(resources, uri);
  3. RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
  4. client.shutdown();
  5. clusterClient.shutdown();
  6. resources.shutdown();

客户端资源基本配置:

属性 描述 默认值
ioThreadPoolSize I/O线程数 Runtime.getRuntime().availableProcessors()
computationThreadPoolSize 任务线程数 Runtime.getRuntime().availableProcessors()

客户端资源高级配置:

属性 描述 默认值
eventLoopGroupProvider EventLoopGroup提供商 -
eventExecutorGroupProvider EventExecutorGroup提供商 -
eventBus 事件总线 DefaultEventBus
commandLatencyCollectorOptions 命令延时收集器配置 DefaultCommandLatencyCollectorOptions
commandLatencyCollector 命令延时收集器 DefaultCommandLatencyCollector
commandLatencyPublisherOptions 命令延时发布器配置 DefaultEventPublisherOptions
dnsResolver DNS处理器 JDK 或者Netty提供
reconnectDelay 重连延时配置 Delay.exponential()
nettyCustomizer Netty自定义配置器 -
tracing 轨迹记录器 -

非集群客户端RedisClient的属性配置:

Redis非集群客户端RedisClient本身提供了配置属性方法:

  1. RedisClient client = RedisClient.create(uri);
  2. client.setOptions(ClientOptions.builder()
  3. .autoReconnect(false)
  4. .pingBeforeActivateConnection(true)
  5. .build());

非集群客户端的配置属性列表:

属性 描述 默认值
pingBeforeActivateConnection 连接激活之前是否执行PING命令 false
autoReconnect 是否自动重连 true
cancelCommandsOnReconnectFailure 重连失败是否拒绝命令执行 false
suspendReconnectOnProtocolFailure 底层协议失败是否挂起重连操作 false
requestQueueSize 请求队列容量 2147483647(Integer#MAX_VALUE)
disconnectedBehavior 失去连接时候的行为 DEFAULT
sslOptions SSL 配置 -
socketOptions Socket配置 10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions 超时配置 -
publishOnScheduler 发布反应式信号数据的调度器 使用I/O线程

集群客户端属性配置:

Redis集群客户端RedisClusterClient本身提供了配置属性方法:

  1. RedisClusterClient client = RedisClusterClient.create(uri);
  2. ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
  3. .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES))
  4. .enableAllAdaptiveRefreshTriggers()
  5. .build();
  6. client.setOptions(ClusterClientOptions.builder()
  7. .topologyRefreshOptions(topologyRefreshOptions)
  8. .build());

集群客户端的配置属性列表:

属性 描述 默认值
enablePeriodicRefresh 是否允许周期性更新集群拓扑视图 false
refreshPeriod 更新集群拓扑视图周期 60 秒
enableAdaptiveRefreshTrigger 设置自适应更新集群拓扑视图触发器RefreshTrigger -
adaptiveRefreshTriggersTimeout 自适应更新集群拓扑视图触发器超时设置 30 秒
refreshTriggersReconnectAttempts 自适应更新集群拓扑视图触发重连次数 5
dynamicRefreshSources 是否允许动态刷新拓扑资源 true
closeStaleConnections 是否允许关闭陈旧的连接 true
maxRedirects 集群重定向次数上限 5
validateClusterNodeMembership 是否校验集群节点的成员关系 true

使用连接池#

引入连接池依赖commons-pool2

  1. <dependency>
  2. <groupId>org.apache.commons</groupId>
  3. <artifactId>commons-pool2</artifactId>
  4. <version>2.7.0</version>
  5. </dependency

基本使用如下:

  1. @Test
  2. public void testUseConnectionPool() throws Exception {
  3. RedisURI redisUri = RedisURI.builder()
  4. .withHost("localhost")
  5. .withPort(6379)
  6. .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
  7. .build();
  8. RedisClient redisClient = RedisClient.create(redisUri);
  9. GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
  10. GenericObjectPool<StatefulRedisConnection<String, String>> pool
  11. = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);
  12. try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {
  13. RedisCommands<String, String> command = connection.sync();
  14. SetArgs setArgs = SetArgs.Builder.nx().ex(5);
  15. command.set("name", "throwable", setArgs);
  16. String n = command.get("name");
  17. log.info("Get value:{}", n);
  18. }
  19. pool.close();
  20. redisClient.shutdown();
  21. }

其中,同步连接的池化支持需要用ConnectionPoolSupport,异步连接的池化支持需要用AsyncConnectionPoolSupportLettuce5.1 之后才支持)。

几个常见的渐进式删除例子#

渐进式删除 Hash 中的域 - 属性:

  1. @Test
  2. public void testDelBigHashKey() throws Exception {
  3. ScanArgs scanArgs = ScanArgs.Builder.limit(2);
  4. ScanCursor cursor = ScanCursor.INITIAL;
  5. String key = "BIG_HASH_KEY";
  6. prepareHashTestData(key);
  7. log.info("开始渐进式删除Hash的元素...");
  8. int counter = 0;
  9. do {
  10. MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);
  11. cursor = ScanCursor.of(result.getCursor());
  12. cursor.setFinished(result.isFinished());
  13. Collection<String> fields = result.getMap().values();
  14. if (!fields.isEmpty()) {
  15. COMMAND.hdel(key, fields.toArray(new String[0]));
  16. }
  17. counter++;
  18. } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
  19. log.info("渐进式删除Hash的元素完毕,迭代次数:{} ...", counter);
  20. }
  21. private void prepareHashTestData(String key) throws Exception {
  22. COMMAND.hset(key, "1", "1");
  23. COMMAND.hset(key, "2", "2");
  24. COMMAND.hset(key, "3", "3");
  25. COMMAND.hset(key, "4", "4");
  26. COMMAND.hset(key, "5", "5");
  27. }

渐进式删除集合中的元素:

  1. @Test
  2. public void testDelBigSetKey() throws Exception {
  3. String key = "BIG_SET_KEY";
  4. prepareSetTestData(key);
  5. ScanArgs scanArgs = ScanArgs.Builder.limit(2);
  6. ScanCursor cursor = ScanCursor.INITIAL;
  7. log.info("开始渐进式删除Set的元素...");
  8. int counter = 0;
  9. do {
  10. ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);
  11. cursor = ScanCursor.of(result.getCursor());
  12. cursor.setFinished(result.isFinished());
  13. List<String> values = result.getValues();
  14. if (!values.isEmpty()) {
  15. COMMAND.srem(key, values.toArray(new String[0]));
  16. }
  17. counter++;
  18. } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
  19. log.info("渐进式删除Set的元素完毕,迭代次数:{} ...", counter);
  20. }
  21. private void prepareSetTestData(String key) throws Exception {
  22. COMMAND.sadd(key, "1", "2", "3", "4", "5");
  23. }

渐进式删除有序集合中的元素:

  1. @Test
  2. public void testDelBigZSetKey() throws Exception {
  3. ScanArgs scanArgs = ScanArgs.Builder.limit(2);
  4. ScanCursor cursor = ScanCursor.INITIAL;
  5. String key = "BIG_ZSET_KEY";
  6. prepareZSetTestData(key);
  7. log.info("开始渐进式删除ZSet的元素...");
  8. int counter = 0;
  9. do {
  10. ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);
  11. cursor = ScanCursor.of(result.getCursor());
  12. cursor.setFinished(result.isFinished());
  13. List<ScoredValue<String>> scoredValues = result.getValues();
  14. if (!scoredValues.isEmpty()) {
  15. COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));
  16. }
  17. counter++;
  18. } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));
  19. log.info("渐进式删除ZSet的元素完毕,迭代次数:{} ...", counter);
  20. }
  21. private void prepareZSetTestData(String key) throws Exception {
  22. COMMAND.zadd(key, 0, "1");
  23. COMMAND.zadd(key, 0, "2");
  24. COMMAND.zadd(key, 0, "3");
  25. COMMAND.zadd(key, 0, "4");
  26. COMMAND.zadd(key, 0, "5");
  27. }

在 SpringBoot 中使用 Lettuce#

个人认为,spring-data-redis中的API封装并不是很优秀,用起来比较重,不够灵活,这里结合前面的例子和代码,在SpringBoot脚手架项目中配置和整合Lettuce。先引入依赖:

  1. <dependencyManagement>
  2. <dependencies>
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-dependencies</artifactId>
  6. <version>2.1.8.RELEASE</version>
  7. <type>pom</type>
  8. <scope>import</scope>
  9. </dependency>
  10. </dependencies>
  11. </dependencyManagement>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>io.lettuce</groupId>
  19. <artifactId>lettuce-core</artifactId>
  20. <version>5.1.8.RELEASE</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.projectlombok</groupId>
  24. <artifactId>lombok</artifactId>
  25. <version>1.18.10</version>
  26. <scope>provided</scope>
  27. </dependency>
  28. </dependencies>

一般情况下,每个应用应该使用单个Redis客户端实例和单个连接实例,这里设计一个脚手架,适配单机、普通主从、哨兵和集群四种使用场景。对于客户端资源,采用默认的实现即可。对于Redis的连接属性,比较主要的有HostPortPassword,其他可以暂时忽略。基于约定大于配置的原则,先定制一系列属性配置类(其实有些配置是可以完全共用,但是考虑到要清晰描述类之间的关系,这里拆分多个配置属性类和多个配置方法):

  1. @Data
  2. @ConfigurationProperties(prefix = "lettuce")
  3. public class LettuceProperties {
  4. private LettuceSingleProperties single;
  5. private LettuceReplicaProperties replica;
  6. private LettuceSentinelProperties sentinel;
  7. private LettuceClusterProperties cluster;
  8. }
  9. @Data
  10. public class LettuceSingleProperties {
  11. private String host;
  12. private Integer port;
  13. private String password;
  14. }
  15. @EqualsAndHashCode(callSuper = true)
  16. @Data
  17. public class LettuceReplicaProperties extends LettuceSingleProperties {
  18. }
  19. @EqualsAndHashCode(callSuper = true)
  20. @Data
  21. public class LettuceSentinelProperties extends LettuceSingleProperties {
  22. private String masterId;
  23. }
  24. @EqualsAndHashCode(callSuper = true)
  25. @Data
  26. public class LettuceClusterProperties extends LettuceSingleProperties {
  27. }

配置类如下,主要使用@ConditionalOnProperty做隔离,一般情况下,很少有人会在一个应用使用一种以上的Redis连接场景:

  1. @RequiredArgsConstructor
  2. @Configuration
  3. @ConditionalOnClass(name = "io.lettuce.core.RedisURI")
  4. @EnableConfigurationProperties(value = LettuceProperties.class)
  5. public class LettuceAutoConfiguration {
  6. private final LettuceProperties lettuceProperties;
  7. @Bean(destroyMethod = "shutdown")
  8. public ClientResources clientResources() {
  9. return DefaultClientResources.create();
  10. }
  11. @Bean
  12. @ConditionalOnProperty(name = "lettuce.single.host")
  13. public RedisURI singleRedisUri() {
  14. LettuceSingleProperties singleProperties = lettuceProperties.getSingle();
  15. return RedisURI.builder()
  16. .withHost(singleProperties.getHost())
  17. .withPort(singleProperties.getPort())
  18. .withPassword(singleProperties.getPassword())
  19. .build();
  20. }
  21. @Bean(destroyMethod = "shutdown")
  22. @ConditionalOnProperty(name = "lettuce.single.host")
  23. public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) {
  24. return RedisClient.create(clientResources, redisUri);
  25. }
  26. @Bean(destroyMethod = "close")
  27. @ConditionalOnProperty(name = "lettuce.single.host")
  28. public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {
  29. return singleRedisClient.connect();
  30. }
  31. @Bean
  32. @ConditionalOnProperty(name = "lettuce.replica.host")
  33. public RedisURI replicaRedisUri() {
  34. LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();
  35. return RedisURI.builder()
  36. .withHost(replicaProperties.getHost())
  37. .withPort(replicaProperties.getPort())
  38. .withPassword(replicaProperties.getPassword())
  39. .build();
  40. }
  41. @Bean(destroyMethod = "shutdown")
  42. @ConditionalOnProperty(name = "lettuce.replica.host")
  43. public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) {
  44. return RedisClient.create(clientResources, redisUri);
  45. }
  46. @Bean(destroyMethod = "close")
  47. @ConditionalOnProperty(name = "lettuce.replica.host")
  48. public StatefulRedisMasterSlaveConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient,
  49. @Qualifier("replicaRedisUri") RedisURI redisUri) {
  50. return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri);
  51. }
  52. @Bean
  53. @ConditionalOnProperty(name = "lettuce.sentinel.host")
  54. public RedisURI sentinelRedisUri() {
  55. LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();
  56. return RedisURI.builder()
  57. .withPassword(sentinelProperties.getPassword())
  58. .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort())
  59. .withSentinelMasterId(sentinelProperties.getMasterId())
  60. .build();
  61. }
  62. @Bean(destroyMethod = "shutdown")
  63. @ConditionalOnProperty(name = "lettuce.sentinel.host")
  64. public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) {
  65. return RedisClient.create(clientResources, redisUri);
  66. }
  67. @Bean(destroyMethod = "close")
  68. @ConditionalOnProperty(name = "lettuce.sentinel.host")
  69. public StatefulRedisMasterSlaveConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient,
  70. @Qualifier("sentinelRedisUri") RedisURI redisUri) {
  71. return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri);
  72. }
  73. @Bean
  74. @ConditionalOnProperty(name = "lettuce.cluster.host")
  75. public RedisURI clusterRedisUri() {
  76. LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();
  77. return RedisURI.builder()
  78. .withHost(clusterProperties.getHost())
  79. .withPort(clusterProperties.getPort())
  80. .withPassword(clusterProperties.getPassword())
  81. .build();
  82. }
  83. @Bean(destroyMethod = "shutdown")
  84. @ConditionalOnProperty(name = "lettuce.cluster.host")
  85. public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) {
  86. return RedisClusterClient.create(clientResources, redisUri);
  87. }
  88. @Bean(destroyMethod = "close")
  89. @ConditionalOnProperty(name = "lettuce.cluster")
  90. public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {
  91. return clusterClient.connect();
  92. }
  93. }

最后为了让IDE识别我们的配置,可以添加IDE亲缘性,/META-INF文件夹下新增一个文件spring-configuration-metadata.json,内容如下:

  1. {
  2. "properties": [
  3. {
  4. "name": "lettuce.single",
  5. "type": "club.throwable.spring.lettuce.LettuceSingleProperties",
  6. "description": "单机配置",
  7. "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
  8. },
  9. {
  10. "name": "lettuce.replica",
  11. "type": "club.throwable.spring.lettuce.LettuceReplicaProperties",
  12. "description": "主从配置",
  13. "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
  14. },
  15. {
  16. "name": "lettuce.sentinel",
  17. "type": "club.throwable.spring.lettuce.LettuceSentinelProperties",
  18. "description": "哨兵配置",
  19. "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
  20. },
  21. {
  22. "name": "lettuce.single",
  23. "type": "club.throwable.spring.lettuce.LettuceClusterProperties",
  24. "description": "集群配置",
  25. "sourceType": "club.throwable.spring.lettuce.LettuceProperties"
  26. }
  27. ]
  28. }

如果想IDE亲缘性做得更好,可以添加/META-INF/additional-spring-configuration-metadata.json进行更多细节定义。简单使用如下:

  1. @Slf4j
  2. @Component
  3. public class RedisCommandLineRunner implements CommandLineRunner {
  4. @Autowired
  5. @Qualifier("singleRedisConnection")
  6. private StatefulRedisConnection<String, String> connection;
  7. @Override
  8. public void run(String... args) throws Exception {
  9. RedisCommands<String, String> redisCommands = connection.sync();
  10. redisCommands.setex("name", 5, "throwable");
  11. log.info("Get value:{}", redisCommands.get("name"));
  12. }
  13. }

小结#

本文算是基于Lettuce的官方文档,对它的使用进行全方位的分析,包括主要功能、配置都做了一些示例,限于篇幅部分特性和配置细节没有分析。Lettuce已经被spring-data-redis接纳作为官方的Redis客户端驱动,所以值得信赖,它的一些API设计确实比较合理,扩展性高的同时灵活性也高。个人建议,基于Lettuce包自行添加配置到SpringBoot应用用起来会得心应手,毕竟RedisTemplate实在太笨重,而且还屏蔽了Lettuce一些高级特性和灵活的API

参考资料:

链接#

(本文完 c-14-d e-a-20190928 最近事太多…)

技术公众号(《Throwable 文摘》),不定期推送笔者原创技术文章(绝不抄袭或者转载):

Redis高级客户端Lettuce详解 - throwable - 博客园 - 图2
https://www.cnblogs.com/throwable/p/11601538.html