DEV-AROUND-AMAP-IN-REACT-第 2 页.png

情报

Rocket 5.0之前的数据库配置主要在 rocket_contrib 包。支持的数据库包括Mysql、Postgres、Mongodb、Neo4j、Redis、Memcache等。

Rocket 5.0之后,rocket_contrib 包就⚠️deprecated。其中大部分模块都被转移到了rocket本体包中,数据库databases模块的内容转移到了新包 rocket_sync_db_pools 中。Mysql、Redis、Mongodb被移除,因其本身已经提供了异步驱动的支持。(具体内容请移步ChangeLog

不过大部分人不会直接去关注ChangeLog,所以有了这样的【issue】Support for redis in 0.5.0 #1471Jeb Rosen的回复大概就是说最新版本的redis已经足够异步,rocket没有必要继续“套娃”了!

总之,现在的问题就是,怎么在rocket中使用redis?

步骤

少废话,上代码!

接下来展示在Rocket 5.0中怎么使用redis

step1:初始化demo项目

  1. cargo new demo-rust-rocket-redis

step2:处理依赖

修改Cargo.toml文件,添加依赖

  1. [package]
  2. name = "demo-rust-rocket-redis"
  3. version = "0.1.0"
  4. edition = "2018"
  5. # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
  6. [dependencies]
  7. redis = { version = "0.21.2", features = ["r2d2"] }
  8. r2d2 = { version="0.8.9" }
  9. [dependencies.rocket]
  10. version = "0.5.0-rc.1"

step3:初始化main.rs

清空main.rs文件的内容,然后把rocket官方示例的最基本demo粘到main.rs里面

  1. #[macro_use] extern crate rocket;
  2. #[get("/")]
  3. fn hello() -> &'static str {
  4. "Hello, world!"
  5. }
  6. #[launch]
  7. fn rocket() -> _ {
  8. rocket::build().mount("/", routes![hello])
  9. }

step4:配置连接池

直接在main.rs中配置redis的连接池,然后让rocket接管(manage)

  1. #[macro_use] extern crate rocket;
  2. #[get("/")]
  3. fn hello() -> &'static str {
  4. "Hello, world!"
  5. }
  6. #[launch]
  7. fn rocket() -> _ {
  8. // 创建连接池
  9. let manager = redis::Client::open("redis://127.0.0.1:6379").unwrap();
  10. let pool = r2d2::Pool::builder().max_size(15).build(manager).unwrap();
  11. // rocket接收连接池为全局状态
  12. rocket::build()
  13. .manage(pool)
  14. .mount("/", routes![hello])
  15. }

step5:使用链接池

在请求中使用redis链接池main.rs

  1. use std::ops::{Deref, DerefMut};
  2. use r2d2::Pool;
  3. use redis::Client;
  4. use rocket::State;
  5. #[macro_use] extern crate rocket;
  6. #[get("/")]
  7. fn hello() -> &'static str {
  8. "Hello, world!"
  9. }
  10. #[get("/get?<key>")]
  11. fn get(key:String,redis_pool:&State<Pool<Client>>) -> String {
  12. let pool = redis_pool.inner();
  13. let mut pconn = pool.get().unwrap();
  14. let conn = pconn.deref_mut();
  15. let get_result = redis::Cmd::get(key).query::<String>(conn);
  16. get_result.expect("redis cmd exec error")
  17. }
  18. #[get("/set?<key>&<value>")]
  19. fn set(key:String,value:String,redis_pool:&State<Pool<Client>>) -> String {
  20. let pool = redis_pool.deref();
  21. let mut pconn = pool.get().unwrap();
  22. let conn = pconn.deref_mut();
  23. let set_result = redis::Cmd::set(key,value).query::<String>(conn);
  24. set_result.expect("redis cmd exec error")
  25. }
  26. #[launch]
  27. fn rocket() -> _ {
  28. let client = redis::Client::open("redis://127.0.0.1:6379").unwrap();
  29. let pool = r2d2::Pool::builder().max_size(15).build(client).unwrap();
  30. rocket::build()
  31. .manage(pool)
  32. .mount("/", routes![hello])
  33. .mount("/redis", routes![get,set])
  34. }

step6:启动服务

  1. % cargo run
  2. Compiling demo-rust-rocket-redis v0.1.0 (/Users/theti/mystack/infra/demo-rust-rocket-redis)
  3. Finished dev [unoptimized + debuginfo] target(s) in 8.28s
  4. Running `target/debug/demo-rust-rocket-redis`
  5. 🔧 Configured for debug.
  6. >> address: 127.0.0.1
  7. >> port: 8000
  8. >> workers: 8
  9. >> ident: Rocket
  10. >> keep-alive: 5s
  11. >> limits: bytes = 8KiB, data-form = 2MiB, file = 1MiB, form = 32KiB, json = 1MiB, msgpack = 1MiB, string = 8KiB
  12. >> tls: disabled
  13. >> temp dir: /var/folders/_c/9wlkncwj3fxd1tm_r6t7s22c0000gp/T/
  14. >> log level: normal
  15. >> cli colors: true
  16. >> shutdown: ctrlc = true, force = true, signals = [SIGTERM], grace = 2s, mercy = 3s
  17. 🛰 Routes:
  18. >> (hello) GET /
  19. >> (get) GET /redis/get?<key>
  20. >> (set) GET /redis/set?<key>&<value>
  21. 📡 Fairings:
  22. >> Shield (liftoff, response, singleton)
  23. 🛡️ Shield:
  24. >> X-Frame-Options: SAMEORIGIN
  25. >> Permissions-Policy: interest-cohort=()
  26. >> X-Content-Type-Options: nosniff
  27. 🚀 Rocket has launched from http://127.0.0.1:8000
  28. GET /redis/set?key=hello&value=rust:
  29. >> Matched: (set) GET /redis/set?<key>&<value>
  30. >> Outcome: Success
  31. >> Response succeeded.
  32. GET /redis/get?key=hello:
  33. >> Matched: (get) GET /redis/get?<key>
  34. >> Outcome: Success
  35. >> Response succeeded.

step7:Redis Set

调用set接口,尝试向redis中添加一条键值对hello:rust,返回Ok。
截屏2021-09-09 下午1.02.31.png

step8:Redis Get

再调用get接口,向Redis查询我们刚刚添加的key,返回rust截屏2021-09-09 下午1.02.45.png

step9:再检查

可以看到我们成功向本地Redis中添加了一条记录
截屏2021-09-09 下午1.06.28.png

结论

所以,上述解决方案的“底层原理”是什么?

基础能力一: Rocket state 机制

Rocket提供了一套state机制帮助维护一些全局可用的“状态量”。像访问数记录、任务队列或者数据库连接。

基础能力二: r2d2 通用连接池

通过维护一个连接池,解决数据库创建/断开连接过于频繁带来的无效损耗。

为什么要用连接池?因为是web服务,所以应该至少要有这样的能力吧!

基础能力三: redis-rs feature=[“r2d2”] Redis连接库

基于Rust实现的Redis客户端连接库,提供了基础的Redis连接能力,并且开启featurer2d2后, redis-rs具备接入r2d2的能力。

能力聚合一: redis-rs + r2d2 => 连接池对象

r2d2提供的能力就是通用连接池r2d2::Pool

想要接入连接池:

  • 我们首先需要一个实现了 Trait ManageConnection 的结构体(假定FooManager)。ManageConnection抽象地表达了一个连接的”生命周期”。

    1. pub trait ManageConnection: Send + Sync + 'static {
    2. type Connection: Send + 'static;
    3. type Error: Error + 'static;
    4. fn connect(&self) -> Result<Self::Connection, Self::Error>;
    5. fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
    6. fn has_broken(&self, conn: &mut Self::Connection) -> bool;
    7. }
  • 然后将我们的FooManager对象传入连接池, 然后就可以得到一个FooManager的连接池

    1. let pool:Pool<FooManager> = r2d2::Pool::builder()
    2. .max_size(15) // 设置池大小
    3. .build(manager) // 传入ManageConnection对象
    4. .unwarp()

    所以,总的来说就是两步走,先创建一个实现了r2d2::ManageConnection的对象,然后将其传入连接池的建造方法r2d2::Builder,最后得到了池对象r2d2::Pool<T:r2d2::ManageConnection>

好消息是redis-rsr2d2feature提供了r2d2连接池的支持。

  1. use crate::{ConnectionLike, RedisError};
  2. // 实现 r2d2::ManageConnection 的宏
  3. macro_rules! impl_manage_connection {
  4. ($client:ty, $connection:ty) => {
  5. impl r2d2::ManageConnection for $client {
  6. type Connection = $connection;
  7. type Error = RedisError;
  8. fn connect(&self) -> Result<Self::Connection, Self::Error> {
  9. self.get_connection()
  10. }
  11. fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
  12. if conn.check_connection() {
  13. Ok(())
  14. } else {
  15. Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
  16. }
  17. }
  18. fn has_broken(&self, conn: &mut Self::Connection) -> bool {
  19. !conn.is_open()
  20. }
  21. }
  22. };
  23. }
  24. // 调用宏,生成 trait impl
  25. impl_manage_connection!(crate::Client, crate::Connection);
  26. #[cfg(feature = "cluster")]
  27. impl_manage_connection!(
  28. crate::cluster::ClusterClient,
  29. crate::cluster::ClusterConnection
  30. );

看一眼源码,这里定义了一个实现r2d2::ManageConnectionTrait的宏(macro)impl_manage_connection,然后调用它为 redis::Client 以及 redis::Connection实现了Traitr2d2::ManageConnection, 并且在集群(redis-cluster)模式下,也为集群客户端提供了连接池的实现支持。

换言之,redis::Client就是我们接入r2d2所需要的那个FooManager。构建一个支持Redis的r2d2连接池的代码最终如下所示:

  1. let manager = redis::Client::open("redis://127.0.0.1:6379").unwrap();
  2. let pool = r2d2::Pool::builder().max_size(15).build(manager).unwrap();

能力聚合二: state + 连接池 => 全局可用的连接池

将 r2d2-redis 连接池作为全局状态(state)由rocket接管,这样在每一个请求中就可以按需获取。

这里需要解决两个问题: (1) Rocket State怎么用? (2)如何使用在真实请求中使用这个连接池?

根据官方教程 ManagedState,两步:

  1. 初始化状态值,调用manage,将状态值转交给rocket,变更全局状态值
  2. 在任意需要访问state的请求中,加上&State<T>参数,就可以获取rocket接管的全局状态值

第一步, 很好解决, 创建一个在上一部分(能力聚合一)得到的pool对象,然后调用manage(pool),将这个连接池交给rocket作为全局状态量接管。

第二步,在请求中添加参数redis_pool:&State<Pool<Client>>,得到全局状态量,连接池。但是,正如第二个问题所言,怎么在真实的请求中使用这个连接池?

我们的最终的目标是使用redis-rs提供的能力与Redis进行交互,而目前我们能够拿到的是redis_pool, 一个引用状态连接池对象&State<Pool<Client>>

redis-rs提供了两种与Redis进行交互API:

  • 低级API: redis::cmd("SET").arg("my_key").arg(42).query(con)**?**;
  • 高级API: con.set("my_key", 42)**?**;

其中con: &mut redis::Connection,所以无论哪种方式我们都需要一个redis::Connection对象。

State实现了Deref Trait,由此通过redis_pool.deref()可以拿到连接池&Pool<Client> (State inner()函数也可以,两者实现完全一样)。得到r2d2连接池以后,我们的最终目标当然是从连接池中拿到我们需要的Redis连接对象redis::Connection

  1. let pool:&Pool<Client> = redis_pool.deref();
  2. // 调用r2d2连接池提供的连接获取方法 get
  3. let mut pconn:PooledConnection<Client> = pool.get().unwrap();

然而,连接池提供的连接获取方法get,得到的结果还不是我们想要的redis::Connection
看一眼 PooledConnection 的部分源码:

  1. impl<M> DerefMut for PooledConnection<M>
  2. where
  3. M: ManageConnection,
  4. {
  5. fn deref_mut(&mut self) -> &mut M::Connection {
  6. &mut self.conn.as_mut().unwrap().conn
  7. }
  8. }

这里实现了ManageConnection的是redis::Client,那么Connection不就是redis::Connection(参考redis-rs的r2d2feature,如下所示)

  1. // impl_manage_connection!(crate::Client, crate::Connection);
  2. impl r2d2::ManageConnection for $client {
  3. type Connection = $connection;
  4. type Error = RedisError;
  5. ...
  6. }

所以,绕了这么一大圈,再调用一下pconn.deref_mut()也就得到了&mut redis::Connection,目标达成!

接着用这个连接对象执行redis-rs提供的API就可以。

展望

本文所给出的Rocket集成redis方案,整体工作量小,依赖少,而且依赖方redis-rs以及r2d2都比较稳定可靠。

当然,这不是唯一的解法。就整体调研中所见识到的解法还包括 rocket_db_poolrocket_redis_r2d2(rocket 0.3 不适用于现在)等。

最后,对于本文所给出的demo是否适用于真实的生产环境,扛住性能以及安全的考验,还有待测试。