作者: 槟橙炮炮

槟橙炮炮离开了原来的工作岗位,准备找一份新的工作。如果你对他的工作感兴趣并需要一名 Rust工程师,请与他联系 binchengZhao@outlook.com .


What is delicate ?

delicate 一个轻量的分布式的任务调度平台.

  1. ![](https://delicate-rs-1301941387.cos.ap-beijing.myqcloud.com/delicate-rs/delicate_logo.png#height=129&id=OwXX9&originHeight=505&originWidth=488&originalType=binary&ratio=1&status=done&style=none&width=125)

特性

  • 友好的用户界面: [前端]方便地管理任务和执行器,监控其状态,支持手动维护运行中的任务等。
  • 灵活的操作: 灵活的任务操作,支持限制单个节点的最大并行数,与cron表达式相对应的时区设置,调度模式(单一、固定数量、不断重复),能够在任何时候手动触发任务,手动终止任务实例,在线查看任务日志。
  • 高可用性: Delicate支持横向扩展。通过部署尽可能多的Delicate服务器和执行器,很容易实现高可用性和性能。
  • 高性能: 轻量级和基本功能加快了性能,`delicate’的基本资源开销大约是(小于0.1%的cpu使用率,10m的内存.)
  • 可观察性: 有许多有意义的统计数据定期以图表的方式展现。
  • 升级: 系统的动态升级(升级是通过获得最新的源代码和进行数据库迁移.)
  • 复用性: 执行器提供restful-api ,可以让用户应用维护自定义任务.
  • 权限管理: 基于casbin实现的权限管理功能,持续优化体验.

delicate 架构图:

architecture.svg

topology.svg

项目效果图参见:https://github.com/BinChengZhao/delicate/tree/main/doc/src/_media

技术栈

  • 后端( scheduler & executor ): Rust
  • 原主要的依赖: (actix-web & diesel & delay-timer & serde & tracing)
  • 现主要的依赖: (poem & tokio & diesel & delay-timer & serde & tracing)
  • 前端: antd-admin (React)
  • Ui: Ant Design
  • 数据库: mysql , postgres (计划支持)

为什么要迁移到poem

  • 在使用 actix-web 迭代时, 由于actix-web 4 稳定版一直没有正式发布,想升级核心依赖,引入新功能, 却被动受限。技术栈革新是一个刻不容缓的计划,当poem发布时我知道机会来了。
  • 在使用 poem 且透明的依赖tokio时,我感到了前所未有的灵活性。
    直接使用tokio生态的组件,去代替原来 actix-web 的一些组件,并且把大量的依赖进行了升级,
    再也不用自己人工制作补丁,或者使用陈旧的依赖。

关于 poem的简要背景。

  1. 该框架具有极快的性能,一致的理念,以及清晰的实现。
  2. 基于hyper,与tokio结合,用户有更多的控制。

迁移的重点:

  1. 网络组件的重新组合,不同风格的维护应用状态。
  2. api级别的修改,避免业务逻辑调整。

迁移前的基本梳理:

  • poem中的handler是一个会生成一个FutureEndpoint对象,框架与tokio的协作可以让请求在多线程运行时中进行效计算。
    而actix-web则不是这样,它是内部由多个单线程的Runtime组成。
    由于这种微妙的差别,以前用于actix-web的handler不能直接用于poem
    因为需要确保每一个handler的输入状态,并且保证跨越.await的值需要都Send。
  • poem的路由是一个可嵌套的Endpoint数据结构,与原来actix-web的配置不同。
  • poem公开的数据结构大多支持Send,可以高效利用线程资源, 而actix-web与之相反。
  • 需要修改所有的中间件实现,改造所有的后台Task,调整所有的全局状态。
  • 在直接依赖 tokio 1.0 的情况下升级多处依赖。
  • 全链路的测试,并编写迁移纪要。

下面是一些 poem & actix-web 的对比:

路由侧

之前基于actix-web的实现,大量路由组通过configure去注册, 应用状态通过 app_data注册,中间件通过wrap注册:

  1. let app = App::new()
  2. .configure(actions::task::config)
  3. .configure(actions::user::config)
  4. .configure(actions::task_log::config)
  5. .configure(actions::executor_group::config)
  6. .configure(actions::executor_processor::config)
  7. .configure(actions::executor_processor_bind::config)
  8. .configure(actions::data_reports::config)
  9. .configure(actions::components::config)
  10. .configure(actions::operation_log::config)
  11. .configure(actions::user_login_log::config)
  12. .app_data(shared_delay_timer.clone())
  13. .app_data(shared_connection_pool.clone())
  14. .app_data(shared_scheduler_meta_info.clone())
  15. .wrap(components::session::auth_middleware())
  16. .wrap(components::session::session_middleware());

路由配置实例:

  1. pub fn config(cfg: &mut web::ServiceConfig) {
  2. cfg.service(create_user)
  3. .service(show_users)
  4. .service(update_user)
  5. .service(delete_user)
  6. .service(login_user)
  7. .service(logout_user)
  8. .service(check_user)
  9. .service(change_password)
  10. .service(roles)
  11. .service(permissions)
  12. .service(append_permission)
  13. .service(delete_permission)
  14. .service(append_role)
  15. .service(delete_role);
  16. }

handler 处理请求示例:

  1. #[post("/api/user/create")]
  2. async fn create_user(
  3. web::Json(user): web::Json<model::QueryNewUser>,
  4. pool: ShareData<db::ConnectionPool>,
  5. ) -> HttpResponse {
  6. // do someting.
  7. }

现基于 poem的实现, 大量路由组通过Route去组织,可以多重嵌套。应用状态 & 中间件都是通过with注册, 所有的组件都有通用的特征 Endpoint

  1. let app = Route::new().nest_no_strip(
  2. "/api",
  3. Route::new()
  4. .nest_no_strip("/api/task", actions::task::route_config())
  5. .nest_no_strip("/api/user", actions::user::route_config())
  6. .nest_no_strip("/api/role", actions::role::route_config())
  7. .nest_no_strip("/api/task_log", actions::task_log::route_config())
  8. .nest_no_strip("/api/tasks_state", actions::data_reports::route_config())
  9. .nest_no_strip("/api/task_instance", actions::task_instance::route_config())
  10. .nest_no_strip("/api/binding", actions::components::binding::route_config())
  11. .nest_no_strip("/api/operation_log", actions::operation_log::route_config())
  12. )
  13. .with(shared_delay_timer)
  14. .with(shared_connection_pool)
  15. .with(shared_scheduler_meta_info)
  16. .with(shared_request_client)
  17. .with(components::session::auth_middleware())
  18. .with(components::session::cookie_middleware());

poem中的路由配置实例:

  1. pub fn route_config() -> Route {
  2. Route::new()
  3. .at("/api/user/create", post(create_user))
  4. .at("/api/user/list", post(show_users))
  5. .at("/api/user/update", post(update_user))
  6. .at("/api/user/delete", post(delete_user))
  7. .at("/api/user/login", post(login_user))
  8. .at("/api/user/logout", post(logout_user))
  9. .at("/api/user/check", post(check_user))
  10. .at("/api/user/change_password", post(change_password))
  11. .at("/api/user/roles", post(roles))
  12. .at("/api/user/permissions", post(permissions))
  13. .at("/api/user/append_permission", post(append_permission))
  14. .at("/api/user/delete_permission", post(delete_permission))
  15. .at("/api/user/append_role", post(append_role))
  16. .at("/api/user/delete_role", post(delete_role))
  17. }

poemhandler 处理请求示例:

  1. async fn create_user(
  2. web::Json(user): web::Json<model::QueryNewUser>,
  3. pool: ShareData<db::ConnectionPool>,
  4. ) -> HttpResponse {
  5. // do someting.
  6. }

poem 理念的代入:

handler

poem 中的 handler, 与actix-web差异并不大只需要调整一些 extractor, 对于一些阻塞性的task,切换到tokio的api去计算

  1. #[handler]
  2. async fn show_task_log_detail(
  3. Json(query_params): Json<model::RecordId>,
  4. pool: Data<&Arc<db::ConnectionPool>>,
  5. ) -> impl IntoResponse {
  6. use db::schema::task_log_extend;
  7. if let Ok(conn) = pool.get() {
  8. let f_result = spawn_blocking::<_, Result<_, diesel::result::Error>>(move || {
  9. let task_log_extend = task_log_extend::table
  10. .find(query_params.record_id.0)
  11. .first::<model::TaskLogExtend>(&conn)?;
  12. Ok(task_log_extend)
  13. })
  14. .await;
  15. let log_extend = f_result
  16. .map(|log_extend_result| {
  17. Into::<UnifiedResponseMessages<model::TaskLogExtend>>::into(log_extend_result)
  18. })
  19. .unwrap_or_else(|e| {
  20. UnifiedResponseMessages::<model::TaskLogExtend>::error()
  21. .customized_error_msg(e.to_string())
  22. });
  23. return Json(log_extend);
  24. }
  25. Json(UnifiedResponseMessages::<model::TaskLogExtend>::error())
  26. }

Endpoint

Endpoint 抽象HTTP请求的Trait, 是所有 handler 的真实面貌。

你可以实现Endpoint来创建你自己的Endpoint处理器。

如下是Endpoint的定义:

  1. /// An HTTP request handler.
  2. #[async_trait::async_trait]
  3. pub trait Endpoint: Send + Sync {
  4. /// Represents the response of the endpoint.
  5. type Output: IntoResponse;
  6. /// Get the response to the request.
  7. async fn call(&self, req: Request) -> Self::Output;
  8. }

poemEndpoint 哲学,跟tower中的Service非常相似,但是poem更简洁一些, 并且poem也兼容tower可以复用其生态与组件。

  1. /// `Service` provides a mechanism by which the caller is able to coordinate
  2. /// readiness. `Service::poll_ready` returns `Ready` if the service expects that
  3. /// it is able to process a request.
  4. pub trait Service<Request> {
  5. /// Responses given by the service.
  6. type Response;
  7. /// Errors produced by the service.
  8. type Error;
  9. /// The future response value.
  10. type Future: Future<Output = Result<Self::Response, Self::Error>>;
  11. /// Returns `Poll::Ready(Ok(()))` when the service is able to process
  12. fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
  13. /// Process the request and return the response asynchronously.
  14. fn call(&mut self, req: Request) -> Self::Future;
  15. }

IntoResponse

IntoResponse 是响应数据的抽象。

所有可以转换为HTTP响应的Response类型都应该实现IntoResponse,并且它们可以作为handler的返回值。

  1. pub trait IntoResponse: Send {
  2. /// Consume itself and return [`Response`].
  3. fn into_response(self) -> Response;
  4. /// Wrap an `impl IntoResponse` to add a header.
  5. fn with_header<K, V>(self, key: K, value: V) -> WithHeader<Self>
  6. where
  7. K: TryInto<HeaderName>,
  8. V: TryInto<HeaderValue>,
  9. Self: Sized,
  10. {
  11. let key = key.try_into().ok();
  12. let value = value.try_into().ok();
  13. WithHeader {
  14. inner: self,
  15. header: key.zip(value),
  16. }
  17. }
  18. /// Wrap an `impl IntoResponse` to set a status code.
  19. fn with_status(self, status: StatusCode) -> WithStatus<Self>
  20. where
  21. Self: Sized,
  22. {
  23. WithStatus {
  24. inner: self,
  25. status,
  26. }
  27. }
  28. /// Wrap an `impl IntoResponse` to set a body.
  29. fn with_body(self, body: impl Into<Body>) -> WithBody<Self>
  30. where
  31. Self: Sized,
  32. {
  33. WithBody {
  34. inner: self,
  35. body: body.into(),
  36. }
  37. }
  38. }

middleware

使用poem制作中间件非常的轻松,如下是一个给请求增加 logger-id 的middlware的示例:

  1. // Unit-struct of logger-id for impl Middleware.
  2. pub struct LoggerId;
  3. impl<E: Endpoint> Middleware<E> for LoggerId {
  4. type Output = LoggerIdMiddleware<E>;
  5. fn transform(&self, ep: E) -> Self::Output {
  6. LoggerIdMiddleware { ep }
  7. }
  8. }
  9. // Wraps the original handler and logs the processing of the request internally.
  10. pub struct LoggerIdMiddleware<E> {
  11. ep: E,
  12. }
  13. #[poem::async_trait]
  14. impl<E: Endpoint> Endpoint for LoggerIdMiddleware<E> {
  15. type Output = E::Output;
  16. async fn call(&self, req: Request) -> Self::Output {
  17. let unique_id = get_unique_id_string();
  18. self.ep
  19. .call(req)
  20. .instrument(info_span!("logger-", id = unique_id.deref()))
  21. .await
  22. }
  23. }

如下是actix-web 实现middlware的模板示例,模板代码确实稍有冗长,且耐人寻味。

pub(crate) struct CasbinService;

impl<S, B> Transform<S> for CasbinService
where
    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = ActixWebError>
        + 'static,
    S::Future: 'static,
    B: 'static,
{
    type Request = ServiceRequest;
    type Response = ServiceResponse<B>;
    type Error = ActixWebError;
    type InitError = ();
    type Transform = CasbinAuthMiddleware<S>;
    type Future = Ready<Result<Self::Transform, Self::InitError>>;

    fn new_transform(&self, service: S) -> Self::Future {
        ok(CasbinAuthMiddleware {
            service: Rc::new(RefCell::new(service)),
        })
    }
}

pub struct CasbinAuthMiddleware<S> {
    service: Rc<RefCell<S>>,
}


impl<S, B> Service for CasbinAuthMiddleware<S>
where
    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = ActixWebError>
        + 'static,
    S::Future: 'static,
    B: 'static,
{
    type Request = ServiceRequest;
    type Response = ServiceResponse<B>;
    type Error = ActixWebError;
    type Future = MiddlewareFuture<Self::Response, Self::Error>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, req: ServiceRequest) -> Self::Future {
        Box::pin(async move {

           // do something.
           return service.call(req).await;
        })
    }

总结

  1. 本次迁移涉及到45个文件的修改和4000行代码的修改(增加了2500行,删除了1579行)。
  2. 切换到poem上,陈旧依赖得以升级,且透明的依赖tokio生态时,项目拥有了前所未有的灵活性。再也不用自己人工制作补丁,或者使用陈旧的依赖。
  3. 迁移后有了poem & tokio 生态加持更容易扩展功能,也降低了维护成本。
  4. 在不影响性能指标的情况下,更好的提升了资源利用率,发挥多核优势。

感谢

在迁移过程中,我有一些需求使用poem 是无法直接处理
随后在 poem 上打开了几个issues,不到一天内就与作者沟通完成,并在poem支持了该功能,太强大了!

  • 我要感谢整个社区和代码贡献者。特别是poem的作者:
    油条哥
  • 感谢用户报告文档中的拼写错误, 这非常感谢大家。
  • 感谢用户加入我们,提供反馈,讨论功能,并获得帮助!
  • 我也很感谢actix-web社区这么好的作品,因为技术选择问题,我决定迁移到poem

Repos:

poem

delicate