tokio是rust的async runtime
Hyper 是lowlevel的http框架,基于tokio
Tower 抽象了一套Service, Layer,用来模块化, tower不和hyper绑定
用rust写http服务没有想象中那么简单,一些原本在golang nodejs中看起来很简单的事,放到rust中会变得很复杂,最近正好在看axum,一个基于tower的http框架,所以研究一下tower

Get Started

初始化项目

  1. cargo new --bin tower_demo

添加依赖

  1. [package]
  2. name = "tower_demo"
  3. version = "0.1.0"
  4. edition = "2021"
  5. # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
  6. [dependencies]
  7. tower = "0.4"
  8. log = "0.4"
  9. tokio = { version = "1", features = ["full"] }
  10. hyper = { version = "0.14", features = ["full"] }
  11. futures = "0.3.21"
  12. env_logger = "0.9.0"

HelloWorld

  1. use std::convert::Infallible;
  2. use hyper::service::{make_service_fn, service_fn};
  3. use hyper::{Body, Request, Response, Server};
  4. use std::net::SocketAddr;
  5. async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
  6. Ok(Response::new(Body::from("hello world")))
  7. }
  8. #[tokio::main]
  9. async fn main() {
  10. env_logger::init();
  11. let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
  12. let make_service = make_service_fn(|_conn| async {
  13. Ok::<_, Infallible>(service_fn(handle))
  14. });
  15. let server = Server::bind(&addr).serve(make_service);
  16. if let Err(e) = server.await {
  17. log::error!("server err {}", e);
  18. }
  19. }
  1. pub fn make_service_fn<F, Target, Ret>(f: F) -> MakeServiceFn<F>
  2. where
  3. F: FnMut(&Target) -> Ret,
  4. Ret: Future,
  5. {
  6. MakeServiceFn { f }
  7. }

make_service_fn 返回的是一个 MakeServiceFn 结构体 这个后面看,入参是 F, F是一个FnMut clousure,接受一个Target类型的入参,返回Ret,Ret是一个Future

  1. let server = Server::bind(&addr).serve(make_service);
  2. pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
  3. where
  4. I: Accept,
  5. I::Error: Into<Box<dyn StdError + Send + Sync>>,
  6. I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
  7. S: MakeServiceRef<I::Conn, Body, ResBody = B>,
  8. S::Error: Into<Box<dyn StdError + Send + Sync>>,
  9. B: HttpBody + 'static,
  10. B::Error: Into<Box<dyn StdError + Send + Sync>>,
  11. E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
  12. E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
  13. {
  14. let serve = self.protocol.serve(self.incoming, new_service);
  15. let spawn_all = serve.spawn_all();
  16. Server { spawn_all }
  17. }
  18. // Just a sort-of "trait alias" of `MakeService`, not to be implemented
  19. // by anyone, only used as bounds.
  20. pub trait MakeServiceRef<Target, ReqBody>: self::sealed::Sealed<(Target, ReqBody)> {
  21. type ResBody: HttpBody;
  22. type Error: Into<Box<dyn StdError + Send + Sync>>;
  23. type Service: HttpService<ReqBody, ResBody = Self::ResBody, Error = Self::Error>;
  24. type MakeError: Into<Box<dyn StdError + Send + Sync>>;
  25. type Future: Future<Output = Result<Self::Service, Self::MakeError>>;
  26. // Acting like a #[non_exhaustive] for associated types of this trait.
  27. //
  28. // Basically, no one outside of hyper should be able to set this type
  29. // or declare bounds on it, so it should prevent people from creating
  30. // trait objects or otherwise writing code that requires using *all*
  31. // of the associated types.
  32. //
  33. // Why? So we can add new associated types to this alias in the future,
  34. // if necessary.
  35. type __DontNameMe: self::sealed::CantImpl;
  36. fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>>;
  37. fn make_service_ref(&mut self, target: &Target) -> Self::Future;
  38. }
  39. impl<T, Target, E, ME, S, F, IB, OB> MakeServiceRef<Target, IB> for T
  40. where
  41. T: for<'a> Service<&'a Target, Error = ME, Response = S, Future = F>,
  42. E: Into<Box<dyn StdError + Send + Sync>>,
  43. ME: Into<Box<dyn StdError + Send + Sync>>,
  44. S: HttpService<IB, ResBody = OB, Error = E>,
  45. F: Future<Output = Result<S, ME>>,
  46. IB: HttpBody,
  47. OB: HttpBody,
  48. {
  49. type Error = E;
  50. type Service = S;
  51. type ResBody = OB;
  52. type MakeError = ME;
  53. type Future = F;
  54. type __DontNameMe = self::sealed::CantName;
  55. fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>> {
  56. self.poll_ready(cx)
  57. }
  58. fn make_service_ref(&mut self, target: &Target) -> Self::Future {
  59. self.call(target)
  60. }
  61. }

这个函数看上去很吓人,一大堆的泛型约束,先看这里的S, S是一个实现了 MakeServiceRef trait的类型, 并且MakeServiceRef 为所有T实现了这个trait,这个T,需要满足

  • for<’a> Service<&’a Target, Error = ME, Response = S, Future = F>
  • Error 是 Into>
  • Response的类型是 HttpService,
  • Future的类型是 Future>, 也就是说Future的Output返回值是一个Result<HttpService<IB, ResBody = OB, Error = E>, Into<Box<dyn StdError + Send + Sync>>> >>> 这简直就是毛子套娃

继续钻进去看HttpService

  1. impl<T, B1, B2> HttpService<B1> for T
  2. where
  3. T: tower_service::Service<Request<B1>, Response = Response<B2>>,
  4. B2: HttpBody,
  5. T::Error: Into<Box<dyn StdError + Send + Sync>>,
  6. {
  7. type ResBody = B2;
  8. type Error = T::Error;
  9. type Future = T::Future;
  10. fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
  11. tower_service::Service::poll_ready(self, cx)
  12. }
  13. fn call(&mut self, req: Request<B1>) -> Self::Future {
  14. tower_service::Service::call(self, req)
  15. }
  16. }

HttpService 默认为所有的实现了 Service 这个trait的 T 加了默认实现,>>> 继续套娃
也就是说,只要是实现 tower Service 这个trait的 T,并且 Response是 Response, 都可以是HttpService

再回过头看service_fn

  1. pub fn service_fn<F, R, S>(f: F) -> ServiceFn<F, R>
  2. where
  3. F: FnMut(Request<R>) -> S,
  4. S: Future,
  5. {
  6. ServiceFn {
  7. f,
  8. _req: PhantomData,
  9. }
  10. }
  11. /// Service returned by [`service_fn`]
  12. pub struct ServiceFn<F, R> {
  13. f: F,
  14. _req: PhantomData<fn(R)>,
  15. }
  16. impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>>
  17. for ServiceFn<F, ReqBody>
  18. where
  19. F: FnMut(Request<ReqBody>) -> Ret,
  20. ReqBody: HttpBody,
  21. Ret: Future<Output = Result<Response<ResBody>, E>>,
  22. E: Into<Box<dyn StdError + Send + Sync>>,
  23. ResBody: HttpBody,
  24. {
  25. type Response = crate::Response<ResBody>;
  26. type Error = E;
  27. type Future = Ret;
  28. fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
  29. Poll::Ready(Ok(()))
  30. }
  31. fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
  32. (self.f)(req)
  33. }
  34. }

service_fn 接受一个 FnMut clousure函数,返回 ServiceFn 结构体,ServiceFn这个结构体实现了 Service trait, 根据下面这3个约束条件我们指到,这个clousure
返回的这个Future output必须是一个Result>

  1. F: FnMut(Request<ReqBody>) -> Ret,
  2. ReqBody: HttpBody,
  3. Ret: Future<Output = Result<Response<ResBody>, E>>,

这个F 就和我们的handler 对上了, 我们知道async函数本质就是返回了一个Future, 这里 Infallible 是一个有意思的Enum, 字面意思就是“不会错的”, 表示这个Error永远不会发生,因为Infallible 是个没有值得enum,你无法构造一个Infallible 类型的变量出来

  1. async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
  2. Ok(Response::new(Body::from("hello world")))
  3. }
  4. #[stable(feature = "convert_infallible", since = "1.34.0")]
  5. #[derive(Copy)]
  6. pub enum Infallible {}

到此我们就搞出了一个 tower_service::Service 类型,但事情还没完,serve函数要的是一个 MakeServiceRef, 再往上看 make_service_fn, 这个函数可以理解为一个工厂函数,返回的是一个Future,这个Future返回的是 Service

  1. impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn<F>
  2. where
  3. F: FnMut(&Target) -> Ret,
  4. Ret: Future<Output = Result<Svc, MkErr>>,
  5. MkErr: Into<Box<dyn StdError + Send + Sync>>,
  6. {
  7. type Error = MkErr;
  8. type Response = Svc;
  9. type Future = Ret;
  10. fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
  11. Poll::Ready(Ok(()))
  12. }
  13. fn call(&mut self, target: &'t Target) -> Self::Future {
  14. (self.f)(target)
  15. }
  16. }

关于Service这个trait,主要关注的是 call 这个函数,这个函数执行了传入的 FnMut。
到这里基本上搞清楚 make_service_fn, service_fn, handler, 和 serve之间的关系,最后看下 serve返回的是什么。

  1. let server = Server::bind(&addr).serve(make_service);
  2. if let Err(e) = server.await {
  3. println!("server err {}", e);
  4. }

看到server.await 就指到 server一定是一个Future

  1. impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
  2. where
  3. I: Accept<Conn = IO, Error = IE>,
  4. IE: Into<Box<dyn StdError + Send + Sync>>,
  5. IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
  6. S: MakeServiceRef<IO, Body, ResBody = B>,
  7. S::Error: Into<Box<dyn StdError + Send + Sync>>,
  8. B: HttpBody + 'static,
  9. B::Error: Into<Box<dyn StdError + Send + Sync>>,
  10. E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
  11. E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
  12. {
  13. type Output = crate::Result<()>;
  14. fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
  15. self.project().spawn_all.poll_watch(cx, &NoopWatcher)
  16. }
  17. }

直接看poll_watch

  1. pub(super) fn poll_watch<W>(
  2. self: Pin<&mut Self>,
  3. cx: &mut task::Context<'_>,
  4. watcher: &W,
  5. ) -> Poll<crate::Result<()>>
  6. where
  7. E: NewSvcExec<IO, S::Future, S::Service, E, W>,
  8. W: Watcher<IO, S::Service, E>,
  9. {
  10. let mut me = self.project();
  11. loop {
  12. if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {
  13. let fut = NewSvcTask::new(connecting, watcher.clone());
  14. me.serve
  15. .as_mut()
  16. .project()
  17. .protocol
  18. .exec
  19. .execute_new_svc(fut);
  20. } else {
  21. return Poll::Ready(Ok(()));
  22. }
  23. }
  24. }

这里 poll_next_ ready之后会得到一个 类型为Connecting的Future, 当这个future resolve之后,就会得到一个connection, NewSvcTast::new 返回的同样也是一个future, 这个poll会一直loop下去,直到 poll_next 返回 None 为止

这里的project() 和 Pin 有关 参考 下面这篇文章
Pin