tokio是rust的async runtime
Hyper 是lowlevel的http框架,基于tokio
Tower 抽象了一套Service, Layer,用来模块化, tower不和hyper绑定
用rust写http服务没有想象中那么简单,一些原本在golang nodejs中看起来很简单的事,放到rust中会变得很复杂,最近正好在看axum,一个基于tower的http框架,所以研究一下tower
Get Started
初始化项目
cargo new --bin tower_demo
添加依赖
[package]name = "tower_demo"version = "0.1.0"edition = "2021"# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies]tower = "0.4"log = "0.4"tokio = { version = "1", features = ["full"] }hyper = { version = "0.14", features = ["full"] }futures = "0.3.21"env_logger = "0.9.0"
HelloWorld
use std::convert::Infallible;use hyper::service::{make_service_fn, service_fn};use hyper::{Body, Request, Response, Server};use std::net::SocketAddr;async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {Ok(Response::new(Body::from("hello world")))}#[tokio::main]async fn main() {env_logger::init();let addr = SocketAddr::from(([127, 0, 0, 1], 3000));let make_service = make_service_fn(|_conn| async {Ok::<_, Infallible>(service_fn(handle))});let server = Server::bind(&addr).serve(make_service);if let Err(e) = server.await {log::error!("server err {}", e);}}
pub fn make_service_fn<F, Target, Ret>(f: F) -> MakeServiceFn<F>whereF: FnMut(&Target) -> Ret,Ret: Future,{MakeServiceFn { f }}
make_service_fn 返回的是一个 MakeServiceFn
let server = Server::bind(&addr).serve(make_service);pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>whereI: Accept,I::Error: Into<Box<dyn StdError + Send + Sync>>,I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,S: MakeServiceRef<I::Conn, Body, ResBody = B>,S::Error: Into<Box<dyn StdError + Send + Sync>>,B: HttpBody + 'static,B::Error: Into<Box<dyn StdError + Send + Sync>>,E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,{let serve = self.protocol.serve(self.incoming, new_service);let spawn_all = serve.spawn_all();Server { spawn_all }}// Just a sort-of "trait alias" of `MakeService`, not to be implemented// by anyone, only used as bounds.pub trait MakeServiceRef<Target, ReqBody>: self::sealed::Sealed<(Target, ReqBody)> {type ResBody: HttpBody;type Error: Into<Box<dyn StdError + Send + Sync>>;type Service: HttpService<ReqBody, ResBody = Self::ResBody, Error = Self::Error>;type MakeError: Into<Box<dyn StdError + Send + Sync>>;type Future: Future<Output = Result<Self::Service, Self::MakeError>>;// Acting like a #[non_exhaustive] for associated types of this trait.//// Basically, no one outside of hyper should be able to set this type// or declare bounds on it, so it should prevent people from creating// trait objects or otherwise writing code that requires using *all*// of the associated types.//// Why? So we can add new associated types to this alias in the future,// if necessary.type __DontNameMe: self::sealed::CantImpl;fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>>;fn make_service_ref(&mut self, target: &Target) -> Self::Future;}impl<T, Target, E, ME, S, F, IB, OB> MakeServiceRef<Target, IB> for TwhereT: for<'a> Service<&'a Target, Error = ME, Response = S, Future = F>,E: Into<Box<dyn StdError + Send + Sync>>,ME: Into<Box<dyn StdError + Send + Sync>>,S: HttpService<IB, ResBody = OB, Error = E>,F: Future<Output = Result<S, ME>>,IB: HttpBody,OB: HttpBody,{type Error = E;type Service = S;type ResBody = OB;type MakeError = ME;type Future = F;type __DontNameMe = self::sealed::CantName;fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>> {self.poll_ready(cx)}fn make_service_ref(&mut self, target: &Target) -> Self::Future {self.call(target)}}
这个函数看上去很吓人,一大堆的泛型约束,先看这里的S, S是一个实现了 MakeServiceRef trait的类型, 并且MakeServiceRef 为所有T实现了这个trait,这个T,需要满足
- for<’a> Service<&’a Target, Error = ME, Response = S, Future = F>
- 这个看起来也很吓人,这里 for<’a> 说明 service的lifetime 和 T这个实例没关系,在rust中,这叫做 High rank trait bound, (HRTB), 通常在clousure中会用到 参考 https://stackoverflow.com/questions/35592750/how-does-for-syntax-differ-from-a-regular-lifetime-bound
- Error 是 Into
> - Response的类型是 HttpService
, - Future的类型是 Future
继续钻进去看HttpService
impl<T, B1, B2> HttpService<B1> for TwhereT: tower_service::Service<Request<B1>, Response = Response<B2>>,B2: HttpBody,T::Error: Into<Box<dyn StdError + Send + Sync>>,{type ResBody = B2;type Error = T::Error;type Future = T::Future;fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {tower_service::Service::poll_ready(self, cx)}fn call(&mut self, req: Request<B1>) -> Self::Future {tower_service::Service::call(self, req)}}
HttpService 默认为所有的实现了 Service 这个trait的 T 加了默认实现,>>> 继续套娃
也就是说,只要是实现 tower Service 这个trait的 T,并且 Response是 Response
再回过头看service_fn
pub fn service_fn<F, R, S>(f: F) -> ServiceFn<F, R>whereF: FnMut(Request<R>) -> S,S: Future,{ServiceFn {f,_req: PhantomData,}}/// Service returned by [`service_fn`]pub struct ServiceFn<F, R> {f: F,_req: PhantomData<fn(R)>,}impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>>for ServiceFn<F, ReqBody>whereF: FnMut(Request<ReqBody>) -> Ret,ReqBody: HttpBody,Ret: Future<Output = Result<Response<ResBody>, E>>,E: Into<Box<dyn StdError + Send + Sync>>,ResBody: HttpBody,{type Response = crate::Response<ResBody>;type Error = E;type Future = Ret;fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {Poll::Ready(Ok(()))}fn call(&mut self, req: Request<ReqBody>) -> Self::Future {(self.f)(req)}}
service_fn 接受一个 FnMut clousure函数,返回 ServiceFn 结构体,ServiceFn这个结构体实现了 Service trait, 根据下面这3个约束条件我们指到,这个clousure
返回的这个Future output必须是一个Result
F: FnMut(Request<ReqBody>) -> Ret,ReqBody: HttpBody,Ret: Future<Output = Result<Response<ResBody>, E>>,
这个F 就和我们的handler 对上了, 我们知道async函数本质就是返回了一个Future, 这里 Infallible 是一个有意思的Enum, 字面意思就是“不会错的”, 表示这个Error永远不会发生,因为Infallible 是个没有值得enum,你无法构造一个Infallible 类型的变量出来
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {Ok(Response::new(Body::from("hello world")))}#[stable(feature = "convert_infallible", since = "1.34.0")]#[derive(Copy)]pub enum Infallible {}
到此我们就搞出了一个 tower_service::Service 类型,但事情还没完,serve函数要的是一个 MakeServiceRef, 再往上看 make_service_fn, 这个函数可以理解为一个工厂函数,返回的是一个Future,这个Future返回的是 Service
impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn<F>whereF: FnMut(&Target) -> Ret,Ret: Future<Output = Result<Svc, MkErr>>,MkErr: Into<Box<dyn StdError + Send + Sync>>,{type Error = MkErr;type Response = Svc;type Future = Ret;fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {Poll::Ready(Ok(()))}fn call(&mut self, target: &'t Target) -> Self::Future {(self.f)(target)}}
关于Service这个trait,主要关注的是 call 这个函数,这个函数执行了传入的 FnMut。
到这里基本上搞清楚 make_service_fn, service_fn, handler, 和 serve之间的关系,最后看下 serve返回的是什么。
let server = Server::bind(&addr).serve(make_service);if let Err(e) = server.await {println!("server err {}", e);}
看到server.await 就指到 server一定是一个Future
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>whereI: Accept<Conn = IO, Error = IE>,IE: Into<Box<dyn StdError + Send + Sync>>,IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,S: MakeServiceRef<IO, Body, ResBody = B>,S::Error: Into<Box<dyn StdError + Send + Sync>>,B: HttpBody + 'static,B::Error: Into<Box<dyn StdError + Send + Sync>>,E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,{type Output = crate::Result<()>;fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {self.project().spawn_all.poll_watch(cx, &NoopWatcher)}}
直接看poll_watch
pub(super) fn poll_watch<W>(self: Pin<&mut Self>,cx: &mut task::Context<'_>,watcher: &W,) -> Poll<crate::Result<()>>whereE: NewSvcExec<IO, S::Future, S::Service, E, W>,W: Watcher<IO, S::Service, E>,{let mut me = self.project();loop {if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {let fut = NewSvcTask::new(connecting, watcher.clone());me.serve.as_mut().project().protocol.exec.execute_new_svc(fut);} else {return Poll::Ready(Ok(()));}}}
这里 poll_next_ ready之后会得到一个 类型为Connecting的Future, 当这个future resolve之后,就会得到一个connection, NewSvcTast::new 返回的同样也是一个future, 这个poll会一直loop下去,直到 poll_next 返回 None 为止
这里的project() 和 Pin 有关 参考 下面这篇文章
Pin
