通过 CTX 在阶段之间共享状态

使用 CTX

用户在请求的不同阶段实现的自定义过滤器并不直接相互交互。为了在过滤器之间共享信息和状态,用户可以定义一个 CTX 结构体。每个请求拥有一个单独的 CTX 对象。所有过滤器都能够读取和更新 CTX 对象的成员。CTX 对象将在请求结束时被丢弃。

示例

在以下示例中,代理在 request_filter 阶段解析请求头部,它存储了一个布尔标志,以便在稍后的 upstream_peer 阶段使用该标志来决定将流量路由到哪个服务器。(技术上,头部可以在 upstream_peer 阶段解析,但我们只是为了演示而在更早的阶段进行解析。)

  1. pub struct MyProxy();
  2. pub struct MyCtx {
  3. beta_user: bool,
  4. }
  5. fn check_beta_user(req: &pingora_http::RequestHeader) -> bool {
  6. // 一些简单的逻辑来检查用户是否是 beta 用户
  7. req.headers.get("beta-flag").is_some()
  8. }
  9. #[async_trait]
  10. impl ProxyHttp for MyProxy {
  11. type CTX = MyCtx;
  12. fn new_ctx(&self) -> Self::CTX {
  13. MyCtx { beta_user: false }
  14. }
  15. async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
  16. ctx.beta_user = check_beta_user(session.req_header());
  17. Ok(false)
  18. }
  19. async fn upstream_peer(
  20. &self,
  21. _session: &mut Session,
  22. ctx: &mut Self::CTX,
  23. ) -> Result<Box<HttpPeer>> {
  24. let addr = if ctx.beta_user {
  25. info!("I'm a beta user");
  26. ("1.0.0.1", 443)
  27. } else {
  28. ("1.1.1.1", 443)
  29. };
  30. let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string()));
  31. Ok(peer)
  32. }
  33. }

在请求之间共享状态

共享诸如计数器、缓存和其他信息等状态在请求之间是常见的。在 Pingora 中共享请求之间的资源和数据并没有什么特别需要的。可以使用 Arcstatic 或任何其他机制。

示例

让我们修改上面的示例,以跟踪 beta 访客的数量以及总访客的数量。计数器可以定义在 MyProxy 结构体本身中,也可以定义为全局变量。由于计数器可以被并发访问,这里使用了 Mutex。

  1. // 全局计数器
  2. static REQ_COUNTER: Mutex<usize> = Mutex::new(0);
  3. pub struct MyProxy {
  4. // 服务的计数器
  5. beta_counter: Mutex<usize>, // AtomicUsize 也可以
  6. }
  7. pub struct MyCtx {
  8. beta_user: bool,
  9. }
  10. fn check_beta_user(req: &pingora_http::RequestHeader) -> bool {
  11. // 一些简单的逻辑来检查用户是否是 beta
  12. req.headers.get("beta-flag").is_some()
  13. }
  14. #[async_trait]
  15. impl ProxyHttp for MyProxy {
  16. type CTX = MyCtx;
  17. fn new_ctx(&self) -> Self::CTX {
  18. MyCtx { beta_user: false }
  19. }
  20. async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
  21. ctx.beta_user = check_beta_user(session.req_header());
  22. Ok(false)
  23. }
  24. async fn upstream_peer(
  25. &self,
  26. _session: &mut Session,
  27. ctx: &mut Self::CTX,
  28. ) -> Result<Box<HttpPeer>> {
  29. let mut req_counter = REQ_COUNTER.lock().unwrap();
  30. *req_counter += 1;
  31. let addr = if ctx.beta_user {
  32. let mut beta_count = self.beta_counter.lock().unwrap();
  33. *beta_count += 1;
  34. info!("I'm a beta user #{beta_count}");
  35. ("1.0.0.1", 443)
  36. } else {
  37. info!("I'm a user #{req_counter}");
  38. ("1.1.1.1", 443)
  39. };
  40. let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string()));
  41. Ok(peer)
  42. }
  43. }

完整的示例可以在 pingora-proxy/examples/ctx.rs 下找到。您可以使用 cargo 运行它:

  1. RUST_LOG=INFO cargo run --example ctx