开发笔记 后端架构
我会在闲暇时间来分享一些Rust的学习体会,内容尽可能是时下最新的,也欢迎各位来与我一起交流,内容喜欢的话可以点赞关注我❤️️
actix.png

✏️ 前言

说到Rust在服务端的开发,我觉得Actix-web + Diesel是目前来说最为通用的一个组合,高效的Web框架,搭配简单好用的ORM,掌握技巧之后,开发的舒适程度已经相当不错了,速度上因为Diesel的原因肯定会差一些(具体可以参考TechEmpower,actix-diesel在大多数栏目里还是处于一个相对靠前的位置),但是我觉得这是一个开发难度和性能上非常不错的均衡了。

本文概要

本文会分别展示我在SHUpdtp和初代项目online_judge使用的两套与数据库连接的方案,并比较它们的优劣。最后我也会分享Actix + Diesel这个组合在项目架构上的一些经验总结,帮助大家更好的去构建后端项目。

canduma是我非常喜欢的一个项目,是一个简单的鉴权服务的demo。SHUpdtp和online_judge都是相当程度上参考了canduma之后才搭建起来的,网上推荐这个项目的文章并不多,所以这里我还是要强烈推荐一下这个项目,希望看完文章以后,大家能花更多的时间多看看这个项目,会很有帮助的。

🎓 数据库连接

为实现高效的数据库连接,可异步的数据库查询时十分有必要的。online_judge采用的是SyncArbiter分配任务的方式,而SHUpdtp采用的是阻塞函数访问连接池的方式,这个栏目我将具体介绍它们的实现细节。

Actor管理连接

Actor_diesel.png
Actor管理连接的原理是将数据库操作定义为Actor的一种动作,由SyncArbiter创建多个线程级别的Actor,使它们能够并发的处理数据库操作,示意图如上。

首先我们准备数据查询Actor(名为DbExecutor),Actor当中存放数据库连接,并设定一个SyncArbiter的启动函数,方便我们批量启动多个同步Actor(SyncArbiter用法,可以参考👈)。

  1. use diesel::prelude::*;
  2. use actix::prelude::*;
  3. use crate::statics::DATABASE_URL;
  4. pub struct DbExecutor(pub PgConnection);
  5. impl Actor for DbExecutor {
  6. type Context = SyncContext<Self>;
  7. }
  8. /// This is state where we will store *DbExecutor* address.
  9. pub struct DBState {
  10. pub db: Addr<DbExecutor>,
  11. }
  12. pub fn create_db_executor() -> Addr<DbExecutor> {
  13. let database_url = (*DATABASE_URL).clone();
  14. SyncArbiter::start(4, move || {
  15. DbExecutor(PgConnection::establish(&database_url).unwrap())
  16. })
  17. }

做好上述准备以后,就可以在main.rs当中准备主函数,启动时获得数据查询Actor的addr方便在需要时对其发送工作指令。

  1. #[actix_web::main]
  2. async fn main() -> io::Result<()> {
  3. // ..
  4. let db_addr = create_db_executor();
  5. HttpServer::new(move || {
  6. App::new()
  7. .data(DBState { db: db_addr.clone() })
  8. // ..
  9. })
  10. .bind("0.0.0.0:8080")?
  11. .run()
  12. .await
  13. }

之后定义数据查询Actor的工作行为,这里以login为例子,先定义工作指令。

  1. #[derive(Debug, Clone, Deserialize)]
  2. pub struct LoginMessage {
  3. pub identity_info: String,
  4. pub password: String,
  5. }
  6. impl Message for LoginMessage {
  7. type Result = Result<OutUser, String>;
  8. }

再为DbExecutor实现Handler使其在接收到工作指令LoginMessage时能做出相应的动作。

  1. impl Handler<LoginMessage> for DbExecutor {
  2. type Result = Result<OutUser, String>;
  3. fn handle(&mut self, msg: LoginMessage, _: &mut Self::Context) -> Self::Result {
  4. use crate::schema::users::dsl::*;
  5. let operation_result =
  6. if msg.identity_info.is_email() {
  7. users.filter(email.eq(msg.identity_info)).limit(1).load::<User>(&self.0)
  8. } else if msg.identity_info.is_mobile() {
  9. users.filter(mobile.eq(msg.identity_info)).limit(1).load::<User>(&self.0)
  10. } else {
  11. users.filter(username.eq(msg.identity_info)).limit(1).load::<User>(&self.0)
  12. }
  13. .expect("Error loading user.")
  14. .pop();
  15. if !operation_result.is_none() {
  16. let user = operation_result.unwrap();
  17. if make_hash(&msg.password, &user.salt) == user.hash.as_ref() {
  18. Ok(OutUser::from(user))
  19. } else {
  20. Err("Wrong password.".to_owned())
  21. }
  22. } else {
  23. Err("Can't find your Account.".to_owned())
  24. }
  25. }
  26. }

在服务层,若需要进行login操作,则需要发送LoginMessage委托DbExecutor完成。

  1. pub async fn login(
  2. data: web::Data<DBState>,
  3. form: web::Form<LoginMessage>,
  4. id: Identity,
  5. ) -> HttpResponse {
  6. // Send message to `DbExecutor` actor
  7. let res = data.db
  8. .send(form.to_owned())
  9. .await;
  10. // ..
  11. }

以上便是使用Actor管理连接对应一个API接口所要做的全部内容,文中我启动了4个同步的的Actor相比1个Actor确实可以达到将近4倍的吞吐。

然而这样的写法有两个缺点:

  • Actor若发送异常阻塞则会一直占用一条数据查询生产线,导致后端效能的下降,甚至到最后完全丧失查询能力。
  • Handler实现起来相对来说较为繁琐,若设计不当,则会增加编程负担。

阻塞函数访问连接池

为了改进上述两个问题,我最终在SHUpdtp(也就是online_judge的重构版本)中采用了和canduma一样的方式,具体方法是将,一次数据库操作分配到actix-web运行时中的一个线程池中去,由运行时自行处理异步。在进行数据库操作时,先从r2d2连接池当中获取一个一个连接,再进行既定操作。

我们先准备数据库的连接池,准备好初始化办法和获取连接的函数。

  1. use crate::errors::ServiceError;
  2. pub mod pool;
  3. use diesel::r2d2::PoolError;
  4. type ConnectionManager = diesel::r2d2::ConnectionManager<diesel::pg::PgConnection>;
  5. pub type Pool = diesel::r2d2::Pool<ConnectionManager>;
  6. pub type PooledConnection = diesel::r2d2::PooledConnection<ConnectionManager>;
  7. pub fn db_connection(pool: &Pool) -> Result<PooledConnection, ServiceError> {
  8. Ok(pool.get().map_err(|_| ServiceError::UnableToConnectToDb)?)
  9. }
  1. use super::{ConnectionManager, Pool, PoolError};
  2. fn init_pool(database_url: &str) -> Result<Pool, PoolError> {
  3. let manager = ConnectionManager::new(database_url);
  4. Pool::builder().build(manager)
  5. }
  6. pub(crate) fn establish_connection(opt: crate::cli_args::Opt) -> Pool {
  7. init_pool(&opt.database_url).expect("Failed to create pool")
  8. }

启动时将连接池作为静态量存放在服务运行时当中。

  1. #[actix_web::main]
  2. async fn main() -> std::io::Result<()> {
  3. //..
  4. let pool = database::pool::establish_connection(opt.clone());
  5. // ..
  6. HttpServer::new(move || {
  7. App::new()
  8. .data(pool.clone())
  9. // ..
  10. })
  11. .bind(("0.0.0.0", opt.port))
  12. .unwrap()
  13. .run()
  14. .await
  15. }

以login服务为例,先通过db_connection方法获取数据库连接,在进行后续操作。

  1. pub fn login(
  2. account: String,
  3. password: String,
  4. pool: web::Data<Pool>,
  5. ) -> ServiceResult<SlimUser> {
  6. let conn = &db_connection(&pool)?;
  7. use crate::schema::users as users_schema;
  8. let user: User = users_schema::table.filter(users_schema::account.eq(account)).first(conn)?;
  9. if user.hash.is_none() || user.salt.is_none() {
  10. let hint = "Password was not set.".to_string();
  11. Err(ServiceError::BadRequest(hint))
  12. } else {
  13. let hash = utils::make_hash(&password, &user.clone().salt.unwrap()).to_vec();
  14. if Some(hash) == user.hash {
  15. Ok(SlimUser::from(user))
  16. } else {
  17. let hint = "Password is wrong.".to_string();
  18. Err(ServiceError::BadRequest(hint))
  19. }
  20. }
  21. }

调用时通过web::block方法,使得数据查询服务在线程池当中运行。

  1. #[post("/login")]
  2. pub async fn login(
  3. body: web::Json<LoginBody>,
  4. identity: Identity,
  5. pool: web::Data<Pool>,
  6. ) -> Result<HttpResponse, ServiceError> {
  7. let res = web::block(move || user::login(
  8. body.account.clone(),
  9. body.password.clone(),
  10. pool
  11. )).await.map_err(|e| {
  12. eprintln!("{}", e);
  13. e
  14. })?;
  15. // ..
  16. }

不难看出,这种方法相较前者节省了大量的代码量,从开发流程上来讲,将异步行为通过web::block透明化了,降低了结构管理的难度。

💼项目结构建议

根据常用的后端三层架构(控制层、服务层、持久层),我依旧推荐大家,将这三层分为三个目录进行编写,如果按照功能类别进行划分(如user、problem、contest…)虽然不影响三层架构,但是一旦出现跨包的模块使用,会使代码变得非常的混乱(以上经验是通过online_judge和SHUpdtp两个项目比较得出的)。

持久层当中,我们主要做的就是为ORM准备映射用的结构体,我推荐准备一下四类结构:

  1. 查询原始数据时所需要的Raw类型结构体

    1. #[derive(Debug, Clone, Serialize, Deserialize, Queryable)]
    2. pub struct User {
    3. pub id: i32,
    4. pub salt: Option<String>,
    5. pub hash: Option<Vec<u8>>,
    6. pub account: String,
    7. pub mobile: Option<String>,
    8. pub role: String,
    9. }
  2. 插入数据时所需要的Insert类型结构体

    1. #[derive(Debug, Insertable)]
    2. #[table_name = "users"]
    3. pub struct InsertableUser {
    4. pub salt: Option<String>,
    5. pub hash: Option<Vec<u8>>,
    6. pub account: String,
    7. pub mobile: Option<String>,
    8. pub role: String,
    9. }
  3. 更新时涉及的Changeset类型结构体

    1. #[derive(AsChangeset)]
    2. #[table_name="users"]
    3. pub struct UserForm {
    4. pub salt: Option<String>,
    5. pub hash: Option<Vec<u8>>,
    6. pub account: Option<String>,
    7. pub mobile: Option<String>,
    8. pub role: Option<String>,
    9. }
  4. 暴露给用户的Slim、Detail或者Out类型结构体(它们通过实现From特征可以实现一个从Raw类型的快速的形式转换) ```rust

    [derive(Serialize)]

    pub struct OutUser { pub id: i32, pub account: String, pub mobile: Option, pub role: String, }

impl From for OutUser { fn from(user: User) -> Self { Self { id: user.id, account: user.account, mobile: user.mobile, role: user.role } } } ``` 具体为什么这么设计,需要了解diesel具体的使用方法,稍微做几个案例,就能发现它的好用之处了。