- 情报
- 步骤
- 结论
- state 机制">基础能力一: Rocket state 机制
- r2d2 通用连接池">基础能力二: r2d2 通用连接池
- redis-rs feature=[“r2d2”] Redis连接库">基础能力三: redis-rs feature=[“r2d2”] Redis连接库
- 能力聚合一: redis-rs + r2d2 => 连接池对象
- 能力聚合二: state + 连接池 => 全局可用的连接池
- 展望
情报
Rocket 5.0之前的数据库配置主要在 rocket_contrib 包。支持的数据库包括Mysql、Postgres、Mongodb、Neo4j、Redis、Memcache等。
Rocket 5.0之后,rocket_contrib 包就⚠️deprecated。其中大部分模块都被转移到了rocket本体包中,数据库databases模块的内容转移到了新包 rocket_sync_db_pools 中。Mysql、Redis、Mongodb被移除,因其本身已经提供了异步驱动的支持。(具体内容请移步ChangeLog)
不过大部分人不会直接去关注ChangeLog,所以有了这样的【issue】Support for redis in 0.5.0 #1471。 Jeb Rosen的回复大概就是说最新版本的redis已经足够异步,rocket没有必要继续“套娃”了!
步骤
少废话,上代码!
step1:初始化demo项目
cargo new demo-rust-rocket-redis
step2:处理依赖
修改Cargo.toml文件,添加依赖
[package]name = "demo-rust-rocket-redis"version = "0.1.0"edition = "2018"# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies]redis = { version = "0.21.2", features = ["r2d2"] }r2d2 = { version="0.8.9" }[dependencies.rocket]version = "0.5.0-rc.1"
step3:初始化main.rs
清空main.rs文件的内容,然后把rocket官方示例的最基本demo粘到main.rs里面
#[macro_use] extern crate rocket;#[get("/")]fn hello() -> &'static str {"Hello, world!"}#[launch]fn rocket() -> _ {rocket::build().mount("/", routes![hello])}
step4:配置连接池
直接在main.rs中配置redis的连接池,然后让rocket接管(manage)
#[macro_use] extern crate rocket;#[get("/")]fn hello() -> &'static str {"Hello, world!"}#[launch]fn rocket() -> _ {// 创建连接池let manager = redis::Client::open("redis://127.0.0.1:6379").unwrap();let pool = r2d2::Pool::builder().max_size(15).build(manager).unwrap();// rocket接收连接池为全局状态rocket::build().manage(pool).mount("/", routes![hello])}
step5:使用链接池
在请求中使用redis链接池main.rs
use std::ops::{Deref, DerefMut};use r2d2::Pool;use redis::Client;use rocket::State;#[macro_use] extern crate rocket;#[get("/")]fn hello() -> &'static str {"Hello, world!"}#[get("/get?<key>")]fn get(key:String,redis_pool:&State<Pool<Client>>) -> String {let pool = redis_pool.inner();let mut pconn = pool.get().unwrap();let conn = pconn.deref_mut();let get_result = redis::Cmd::get(key).query::<String>(conn);get_result.expect("redis cmd exec error")}#[get("/set?<key>&<value>")]fn set(key:String,value:String,redis_pool:&State<Pool<Client>>) -> String {let pool = redis_pool.deref();let mut pconn = pool.get().unwrap();let conn = pconn.deref_mut();let set_result = redis::Cmd::set(key,value).query::<String>(conn);set_result.expect("redis cmd exec error")}#[launch]fn rocket() -> _ {let client = redis::Client::open("redis://127.0.0.1:6379").unwrap();let pool = r2d2::Pool::builder().max_size(15).build(client).unwrap();rocket::build().manage(pool).mount("/", routes![hello]).mount("/redis", routes![get,set])}
step6:启动服务
% cargo runCompiling demo-rust-rocket-redis v0.1.0 (/Users/theti/mystack/infra/demo-rust-rocket-redis)Finished dev [unoptimized + debuginfo] target(s) in 8.28sRunning `target/debug/demo-rust-rocket-redis`🔧 Configured for debug.>> address: 127.0.0.1>> port: 8000>> workers: 8>> ident: Rocket>> keep-alive: 5s>> limits: bytes = 8KiB, data-form = 2MiB, file = 1MiB, form = 32KiB, json = 1MiB, msgpack = 1MiB, string = 8KiB>> tls: disabled>> temp dir: /var/folders/_c/9wlkncwj3fxd1tm_r6t7s22c0000gp/T/>> log level: normal>> cli colors: true>> shutdown: ctrlc = true, force = true, signals = [SIGTERM], grace = 2s, mercy = 3s🛰 Routes:>> (hello) GET />> (get) GET /redis/get?<key>>> (set) GET /redis/set?<key>&<value>📡 Fairings:>> Shield (liftoff, response, singleton)🛡️ Shield:>> X-Frame-Options: SAMEORIGIN>> Permissions-Policy: interest-cohort=()>> X-Content-Type-Options: nosniff🚀 Rocket has launched from http://127.0.0.1:8000GET /redis/set?key=hello&value=rust:>> Matched: (set) GET /redis/set?<key>&<value>>> Outcome: Success>> Response succeeded.GET /redis/get?key=hello:>> Matched: (get) GET /redis/get?<key>>> Outcome: Success>> Response succeeded.
step7:Redis Set
调用set接口,尝试向redis中添加一条键值对hello:rust,返回Ok。
step8:Redis Get
再调用get接口,向Redis查询我们刚刚添加的key,返回rust
step9:再检查
结论
基础能力一: Rocket state 机制
Rocket提供了一套state机制帮助维护一些全局可用的“状态量”。像访问数记录、任务队列或者数据库连接。
基础能力二: r2d2 通用连接池
通过维护一个连接池,解决数据库创建/断开连接过于频繁带来的无效损耗。
为什么要用连接池?因为是web服务,所以应该至少要有这样的能力吧!
基础能力三: redis-rs feature=[“r2d2”] Redis连接库
基于Rust实现的Redis客户端连接库,提供了基础的Redis连接能力,并且开启featurer2d2后, redis-rs具备接入r2d2的能力。
能力聚合一: redis-rs + r2d2 => 连接池对象
r2d2提供的能力就是通用连接池r2d2::Pool。
想要接入连接池:
我们首先需要一个实现了 Trait ManageConnection 的结构体(假定FooManager)。ManageConnection抽象地表达了一个连接的”生命周期”。
pub trait ManageConnection: Send + Sync + 'static {type Connection: Send + 'static;type Error: Error + 'static;fn connect(&self) -> Result<Self::Connection, Self::Error>;fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;fn has_broken(&self, conn: &mut Self::Connection) -> bool;}
然后将我们的FooManager对象传入连接池, 然后就可以得到一个FooManager的连接池
let pool:Pool<FooManager> = r2d2::Pool::builder().max_size(15) // 设置池大小.build(manager) // 传入ManageConnection对象.unwarp()
所以,总的来说就是两步走,先创建一个实现了
r2d2::ManageConnection的对象,然后将其传入连接池的建造方法r2d2::Builder,最后得到了池对象r2d2::Pool<T:r2d2::ManageConnection>。
好消息是redis-rs的r2d2feature提供了r2d2连接池的支持。
use crate::{ConnectionLike, RedisError};// 实现 r2d2::ManageConnection 的宏macro_rules! impl_manage_connection {($client:ty, $connection:ty) => {impl r2d2::ManageConnection for $client {type Connection = $connection;type Error = RedisError;fn connect(&self) -> Result<Self::Connection, Self::Error> {self.get_connection()}fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {if conn.check_connection() {Ok(())} else {Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))}}fn has_broken(&self, conn: &mut Self::Connection) -> bool {!conn.is_open()}}};}// 调用宏,生成 trait implimpl_manage_connection!(crate::Client, crate::Connection);#[cfg(feature = "cluster")]impl_manage_connection!(crate::cluster::ClusterClient,crate::cluster::ClusterConnection);
看一眼源码,这里定义了一个实现r2d2::ManageConnectionTrait的宏(macro)impl_manage_connection,然后调用它为 redis::Client 以及 redis::Connection实现了Traitr2d2::ManageConnection, 并且在集群(redis-cluster)模式下,也为集群客户端提供了连接池的实现支持。
换言之,redis::Client就是我们接入r2d2所需要的那个FooManager。构建一个支持Redis的r2d2连接池的代码最终如下所示:
let manager = redis::Client::open("redis://127.0.0.1:6379").unwrap();let pool = r2d2::Pool::builder().max_size(15).build(manager).unwrap();
能力聚合二: state + 连接池 => 全局可用的连接池
将 r2d2-redis 连接池作为全局状态(state)由rocket接管,这样在每一个请求中就可以按需获取。
这里需要解决两个问题: (1) Rocket State怎么用? (2)如何使用在真实请求中使用这个连接池?
根据官方教程 ManagedState,两步:
- 初始化状态值,调用manage,将状态值转交给rocket,变更全局状态值
- 在任意需要访问state的请求中,加上
&State<T>参数,就可以获取rocket接管的全局状态值
第一步, 很好解决, 创建一个在上一部分(能力聚合一)得到的pool对象,然后调用manage(pool),将这个连接池交给rocket作为全局状态量接管。
第二步,在请求中添加参数redis_pool:&State<Pool<Client>>,得到全局状态量,连接池。但是,正如第二个问题所言,怎么在真实的请求中使用这个连接池?
我们的最终的目标是使用redis-rs提供的能力与Redis进行交互,而目前我们能够拿到的是redis_pool, 一个引用状态连接池对象&State<Pool<Client>>。
redis-rs提供了两种与Redis进行交互API:
其中con: &mut redis::Connection,所以无论哪种方式我们都需要一个redis::Connection对象。
State实现了Deref Trait,由此通过redis_pool.deref()可以拿到连接池&Pool<Client> (State inner()函数也可以,两者实现完全一样)。得到r2d2连接池以后,我们的最终目标当然是从连接池中拿到我们需要的Redis连接对象redis::Connection。
let pool:&Pool<Client> = redis_pool.deref();// 调用r2d2连接池提供的连接获取方法 getlet mut pconn:PooledConnection<Client> = pool.get().unwrap();
然而,连接池提供的连接获取方法get,得到的结果还不是我们想要的redis::Connection。
看一眼 PooledConnection
impl<M> DerefMut for PooledConnection<M>whereM: ManageConnection,{fn deref_mut(&mut self) -> &mut M::Connection {&mut self.conn.as_mut().unwrap().conn}}
这里实现了ManageConnection的是redis::Client,那么Connection不就是redis::Connection(参考redis-rs的r2d2feature,如下所示)
// impl_manage_connection!(crate::Client, crate::Connection);impl r2d2::ManageConnection for $client {type Connection = $connection;type Error = RedisError;...}
所以,绕了这么一大圈,再调用一下pconn.deref_mut()也就得到了&mut redis::Connection,目标达成!
展望
本文所给出的Rocket集成redis方案,整体工作量小,依赖少,而且依赖方redis-rs以及r2d2都比较稳定可靠。
当然,这不是唯一的解法。就整体调研中所见识到的解法还包括 rocket_db_pool、rocket_redis_r2d2(rocket 0.3 不适用于现在)等。
最后,对于本文所给出的demo是否适用于真实的生产环境,扛住性能以及安全的考验,还有待测试。
