译者:华为-王江桐,华为-周紫鹏


原文

继续我们的开源系列,我们为无处不在的reqwest Rust crate提供了一个中间件适配器。

这是我们开源系列的第二篇文章,我们将在其中讨论TrueLayer的工程挑战并开源我们的解决方案。如果你错过了我们的第一篇推文,我们的第一篇推文是Rust中的gRPC负载平衡。(原文/中文月刊

本文主题是reqwest-middleware,一个构建在reqwest HTTP客户端之上的crate,用于提供中间件功能。

问题

当通过网络与内部和外部服务进行通信时,由于服务会失败,大规模运行应用程序需要内置的韧性。

重试是提高可靠性的常用策略。机制相当简单:将每个请求包装在一个循环中并重试,直到获得成功响应或尝试次数用完为止。

我们的代码库中有数十个客户端:我们不想以特别的方式为每个客户端重新实现重试。

同时,我们更愿意让我们的域代码不受这种网络级别的限制——最完美的方式是,在 HTTP 客户端本身中透明地实现重试。

我们可以编写一个RetryHttpClient来包装标准客户端,以增加重试功能——但重试并不是全部。我们希望 HTTP 客户端处理其他功能:分布式跟踪header的传播、缓存、日志记录。

但是我们不想编写TracingRetryableClient、TracingRetryableCachingHttpClient、RetryableTracingCachingHttpClient(顺序很重要!)以及所有其他可能的组合。

我们想要一种可组合的抽象模式。

所有这些功能都遵循相同的模式:

  • 我们想在执行请求之前和之后运行一些任意逻辑

  • 该逻辑完全独立于问题域,它只关注底层传输和整个组织统一的要求(例如日志记录标准)。

好消息是,这是软件系统中常见的一个问题,因此也有一个通用的解决方案:中间件。

(P.S.:请注意,我们在本文中指的是一种非常特殊的中间件,该术语本身更为笼统。有关中间件在其他上下文中的用法,请参阅中间件的维基百科页面

Rust HTTP客户端中间件

在TrueLayer,我们使用reqwest作为我们所有Rust服务的 HTTP 客户端。

我们选择它是因为它提供了async-first API,与tokio兼容,并且它已广泛的在生产中使用。

遗憾的是,reqwest不支持现有即用的中间件。

我们的选择是什么?

  • 使用现成的crate替换reqwest,或者在reqwest之上做拓展。在撰写本文时,对我们来说,没有其他完善的、支持中间件的 Rust HTTP 客户端能够提供与reqwest一样的功能。surf非常流行并且内置中间件,但它需要引入async-std

  • 尝试去获取上游实现的中间件支持。reqwest的维护者从 2017 年开始讨论这个问题(请参阅ISSUE),但似乎仍然没有达成共识,甚至没有就此类功能是否属于该crate 达成共识。因此,我们不太可能在短期内完成某些事情。

  • 最后一个选择是,包装reqwest并在其上实现中间件,所以这就是我们采用的方法。reqwest-middleware诞生了。

使用reqwest-middleware我们能够将中间件附加到Client上,然后就像我们直接使用reqwest一样发出请求:

  1. use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
  2. use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
  3. use reqwest_tracing::TracingMiddleware;
  4. #[tokio::main]
  5. async fn main() {
  6. let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
  7. let client = ClientBuilder::new(reqwest::Client::new())
  8. .with(TracingMiddleware)
  9. .with(RetryTransientMiddleware::new_with_policy(retry_policy))
  10. .build();
  11. run(client).await;
  12. }
  13. async fn run(client: ClientWithMiddleware) {
  14. // free retries!
  15. client
  16. .get("https://some-external-service.com")
  17. .header("foo", "bar")
  18. .send()
  19. .await
  20. .unwrap();
  21. }

现有技术

在讨论我们的实现之前,让我们先看看现有的一些常用的中间件API:

Surf

Surf 是一个Rust HTTP客户端。这是他们文档中的中间件示例:

  1. /// Log each request's duration
  2. #[derive(Debug)]
  3. pub struct Logger;
  4. #[surf::utils::async_trait]
  5. impl Middleware for Logger {
  6. async fn handle(
  7. &self,
  8. req: Request,
  9. client: Client,
  10. next: Next<'_>,
  11. ) -> Result<Response> {
  12. println!("sending request to {}", req.url());
  13. let now = time::Instant::now();
  14. let res = next.run(req, client).await?;
  15. println!("request completed ({:?})", now.elapsed());
  16. Ok(res)
  17. }
  18. }

我们能看到,它接受一个请求对象和一个next值,该值可用于将该请求转发到剩余的管道中,并返回一个Response。这让我们在向下转发之前,可以通过改变请求方式来处理请求,我们还可以在返回之前更改从next.run返回的res值。

我们甚至可以在next周围使用控制流,它允许重试和短路:

  1. #[derive(Debug)]
  2. pub struct ConditionalCall;
  3. #[surf::utils::async_trait]
  4. impl Middleware for ConditionalCall {
  5. async fn handle(
  6. &self,
  7. req: Request,
  8. client: Client,
  9. next: Next<'_>,
  10. ) -> Result<Response> {
  11. // Silly example: return a dummy response 50% of the time
  12. if rand::random()::<bool>() {
  13. let res = next.run(req, client).await?;
  14. Ok(res)
  15. } else {
  16. let response = http_types::Response::new(StatusCode::Ok);
  17. Ok(response)
  18. }
  19. }
  20. }

Express

Express是一个完善的Node.js Web框架。它的中间件被编写为普通函数,这是他们文档中的一个例子:

  1. app.use(function (req, res, next) {
  2. console.log('Time:', Date.now())
  3. next()
  4. })

这与surf的方法非常相似,除了我们使用response对象并可以直接改变它:中间件函数不返回任何内容。

Tower

tower是用于网络应用程序的通用Rust组件库。

它被用于许多著名crate中,例如hyper和tonic。tower的中间件有点复杂,很可能是因为,他们不想强制用户使用动态调度(例如async_trait)。

至于其他库,这是tower文档中给出的示例:

  1. pub struct LogLayer {
  2. target: &'static str,
  3. }
  4. impl<S> Layer<S> for LogLayer {
  5. type Service = LogService<S>;
  6. fn layer(&self, service: S) -> Self::Service {
  7. LogService {
  8. target: self.target,
  9. service
  10. }
  11. }
  12. }
  13. // This service implements the Log behavior
  14. pub struct LogService<S> {
  15. target: &'static str,
  16. service: S,
  17. }
  18. impl<S, Request> Service<Request> for LogService<S>
  19. where
  20. S: Service<Request>,
  21. Request: fmt::Debug,
  22. {
  23. type Response = S::Response;
  24. type Error = S::Error;
  25. type Future = S::Future;
  26. fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
  27. self.service.poll_ready(cx)
  28. }
  29. fn call(&mut self, request: Request) -> Self::Future {
  30. // Insert log statement here or other functionality
  31. println!("request = {:?}, target = {:?}", request, self.target);
  32. self.service.call(request)
  33. }
  34. }

忽略用于反压的poll_ready方法,tower的Service被定义为从请求到响应的函数:call返回一个Future,其中Future::Item是Service::Response的关联类型。

surf中的异步中间件的trait更为简单,因为它依赖于过程宏(async_trait),在trait中使用async fn语法——在底层它转换为boxing futures 。这是必要的,因为trait方法尚不支持异步。请参阅Nicholas D. Matsakis的这篇文章以深入了解原因。

tower中的中间件是通过Layer trait定义的,该trait将一个服务映射到另一个服务。实现这个特性通常涉及让一个通用结构包装一些Service并委托对它的调用。

被包装的Service与surf和express中的next参数起到相同的作用。它提供了一种调用中间件链其余部分的方法。这种方法仍然允许我们使用next的API相同的方式处理请求和响应。

Finagle

Finagle是一个用Scala编写的JVM RPC系统。让我们也从finagle文档中举一个中间件示例:

  1. class TimeoutFilter[Req, Rep](timeout: Duration, timer: Timer)
  2. extends SimpleFilter[Req, Rep] {
  3. def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
  4. val res = service(request)
  5. res.within(timer, timeout)
  6. }
  7. }

这里的Service与tower非常相似:一个从请求到响应的函数。

Finagle中的中间件称为Filter。Filter类型比tower的Layer更复杂,因为它不要求apply中的Req和Rep类型与服务参数中请求和回复的类型保持一致。

SimpleFilter,顾名思义,是具有固定请求/响应类型的简化版本。SimpleFilter将一个请求和包装服务作为参数,并返回一个响应,因此它的功能类似tower API,但是将Layer::layer和Service::call压缩到了单个SimpleFilter::apply方法中。

中间件类型

一般来说,你会发现,中间件API分为两类:要么是一个参数为请求和next的函数,就像surf和express;或者从一个映射服务到另一个,就像tower和Finagle.

总的来说,这两种方法都提供了同样多的灵活性。两者都需要每个中间件至少有一个额外的动态分发,因为 Rust不支持在 trait 方法的返回类型中包含impl Trait(目前),所以我们采用Next方法,因为这使得更容易实现中间件。surf和tower之间的差异证明了这一点。

reqwest-中间件

我们最终得到了一个非常标准的中间件API(有关API的更详细描述,请参阅文档):

  1. #[async_trait]
  2. pub trait Middleware {
  3. async fn handle(&self, req: Request, extensions: &mut Extensions, next: Next<'_>)
  4. -> Result<Response>;
  5. }

Extensions用于以类型安全的方式跨中间件获取任意信息,不论是从外部中间件到更深的中间件,还是从内部中间件到以前的中间件。

出于演示目的,举例一个简单的日志中间件实现:

  1. use reqwest::{Request, Response};
  2. use reqwest_middleware::{Middleware, Next};
  3. use truelayer_extensions::Extensions;
  4. struct LoggingMiddleware;
  5. #[async_trait::async_trait]
  6. impl Middleware for LoggingMiddleware {
  7. async fn handle(
  8. &self,
  9. req: Request,
  10. extensions: &mut Extensions,
  11. next: Next<'_>,
  12. ) -> reqwest_middleware::Result<Response> {
  13. tracing::info!("Sending request {} {}", req.method(), req.url());
  14. let resp = next.run(req, extensions).await?;
  15. tracing::info!("Got response {}", resp.status());
  16. Ok(resp)
  17. }
  18. }
  1. use reqwest_middlewar::ClientBuilder;
  2. #[tokio::main]
  3. async fn main() {
  4. tracing_subscriber::fmt::init();
  5. let client = ClientBuilder::new(reqwest::Client::new())
  6. .with(LoggingMiddleware)
  7. .build();
  8. client
  9. .get("https://truelayer.com/")
  10. .send()
  11. .await
  12. .unwrap();
  13. }
  1. $ RUST_LOG=info cargo run
  2. Jul 20 19:59:35.585 INFO post_reqwest_middleware: Sending request GET https://truelayer.com/
  3. Jul 20 19:59:35.705 INFO post_reqwest_middleware: Got response 200 OK

结论

我们使用启用中间件的客户端包装reqwest,该客户端使用相同的简单API。这使得能够为我们的韧性和可观察性需求构建可重用的组件。

最重要的是,我们还发布了reqwest-retry和reqwest-opentracing,它们应该能涵盖reqwest crate很多的使用场景。

开发人员现在可以通过导入几个crate并将with_middleware调用添加到客户端设置代码来强化与远程HTTP的集成——而不会中断任何其他应用程序代码。