tokio是rust的async runtime
Hyper 是lowlevel的http框架,基于tokio
Tower 抽象了一套Service, Layer,用来模块化, tower不和hyper绑定
用rust写http服务没有想象中那么简单,一些原本在golang nodejs中看起来很简单的事,放到rust中会变得很复杂,最近正好在看axum,一个基于tower的http框架,所以研究一下tower
Get Started
初始化项目
cargo new --bin tower_demo
添加依赖
[package]
name = "tower_demo"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tower = "0.4"
log = "0.4"
tokio = { version = "1", features = ["full"] }
hyper = { version = "0.14", features = ["full"] }
futures = "0.3.21"
env_logger = "0.9.0"
HelloWorld
use std::convert::Infallible;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::net::SocketAddr;
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("hello world")))
}
#[tokio::main]
async fn main() {
env_logger::init();
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let make_service = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(handle))
});
let server = Server::bind(&addr).serve(make_service);
if let Err(e) = server.await {
log::error!("server err {}", e);
}
}
pub fn make_service_fn<F, Target, Ret>(f: F) -> MakeServiceFn<F>
where
F: FnMut(&Target) -> Ret,
Ret: Future,
{
MakeServiceFn { f }
}
make_service_fn 返回的是一个 MakeServiceFn
let server = Server::bind(&addr).serve(make_service);
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
I: Accept,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
{
let serve = self.protocol.serve(self.incoming, new_service);
let spawn_all = serve.spawn_all();
Server { spawn_all }
}
// Just a sort-of "trait alias" of `MakeService`, not to be implemented
// by anyone, only used as bounds.
pub trait MakeServiceRef<Target, ReqBody>: self::sealed::Sealed<(Target, ReqBody)> {
type ResBody: HttpBody;
type Error: Into<Box<dyn StdError + Send + Sync>>;
type Service: HttpService<ReqBody, ResBody = Self::ResBody, Error = Self::Error>;
type MakeError: Into<Box<dyn StdError + Send + Sync>>;
type Future: Future<Output = Result<Self::Service, Self::MakeError>>;
// Acting like a #[non_exhaustive] for associated types of this trait.
//
// Basically, no one outside of hyper should be able to set this type
// or declare bounds on it, so it should prevent people from creating
// trait objects or otherwise writing code that requires using *all*
// of the associated types.
//
// Why? So we can add new associated types to this alias in the future,
// if necessary.
type __DontNameMe: self::sealed::CantImpl;
fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>>;
fn make_service_ref(&mut self, target: &Target) -> Self::Future;
}
impl<T, Target, E, ME, S, F, IB, OB> MakeServiceRef<Target, IB> for T
where
T: for<'a> Service<&'a Target, Error = ME, Response = S, Future = F>,
E: Into<Box<dyn StdError + Send + Sync>>,
ME: Into<Box<dyn StdError + Send + Sync>>,
S: HttpService<IB, ResBody = OB, Error = E>,
F: Future<Output = Result<S, ME>>,
IB: HttpBody,
OB: HttpBody,
{
type Error = E;
type Service = S;
type ResBody = OB;
type MakeError = ME;
type Future = F;
type __DontNameMe = self::sealed::CantName;
fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>> {
self.poll_ready(cx)
}
fn make_service_ref(&mut self, target: &Target) -> Self::Future {
self.call(target)
}
}
这个函数看上去很吓人,一大堆的泛型约束,先看这里的S, S是一个实现了 MakeServiceRef
trait的类型, 并且MakeServiceRef
为所有T实现了这个trait,这个T,需要满足
- for<’a> Service<&’a Target, Error = ME, Response = S, Future = F>
- 这个看起来也很吓人,这里 for<’a> 说明 service的lifetime 和 T这个实例没关系,在rust中,这叫做 High rank trait bound, (HRTB), 通常在clousure中会用到 参考 https://stackoverflow.com/questions/35592750/how-does-for-syntax-differ-from-a-regular-lifetime-bound
- Error 是 Into
> - Response的类型是 HttpService
, - Future的类型是 Future
继续钻进去看HttpService
impl<T, B1, B2> HttpService<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
B2: HttpBody,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type ResBody = B2;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
tower_service::Service::poll_ready(self, cx)
}
fn call(&mut self, req: Request<B1>) -> Self::Future {
tower_service::Service::call(self, req)
}
}
HttpService 默认为所有的实现了 Service 这个trait的 T 加了默认实现,>>> 继续套娃
也就是说,只要是实现 tower Service 这个trait的 T,并且 Response是 Response
再回过头看service_fn
pub fn service_fn<F, R, S>(f: F) -> ServiceFn<F, R>
where
F: FnMut(Request<R>) -> S,
S: Future,
{
ServiceFn {
f,
_req: PhantomData,
}
}
/// Service returned by [`service_fn`]
pub struct ServiceFn<F, R> {
f: F,
_req: PhantomData<fn(R)>,
}
impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>>
for ServiceFn<F, ReqBody>
where
F: FnMut(Request<ReqBody>) -> Ret,
ReqBody: HttpBody,
Ret: Future<Output = Result<Response<ResBody>, E>>,
E: Into<Box<dyn StdError + Send + Sync>>,
ResBody: HttpBody,
{
type Response = crate::Response<ResBody>;
type Error = E;
type Future = Ret;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
(self.f)(req)
}
}
service_fn 接受一个 FnMut clousure函数,返回 ServiceFn 结构体,ServiceFn这个结构体实现了 Service trait, 根据下面这3个约束条件我们指到,这个clousure
返回的这个Future output必须是一个Result
F: FnMut(Request<ReqBody>) -> Ret,
ReqBody: HttpBody,
Ret: Future<Output = Result<Response<ResBody>, E>>,
这个F 就和我们的handler 对上了, 我们知道async函数本质就是返回了一个Future, 这里 Infallible 是一个有意思的Enum, 字面意思就是“不会错的”, 表示这个Error永远不会发生,因为Infallible 是个没有值得enum,你无法构造一个Infallible 类型的变量出来
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("hello world")))
}
#[stable(feature = "convert_infallible", since = "1.34.0")]
#[derive(Copy)]
pub enum Infallible {}
到此我们就搞出了一个 tower_service::Service 类型,但事情还没完,serve函数要的是一个 MakeServiceRef, 再往上看 make_service_fn, 这个函数可以理解为一个工厂函数,返回的是一个Future,这个Future返回的是 Service
impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn<F>
where
F: FnMut(&Target) -> Ret,
Ret: Future<Output = Result<Svc, MkErr>>,
MkErr: Into<Box<dyn StdError + Send + Sync>>,
{
type Error = MkErr;
type Response = Svc;
type Future = Ret;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, target: &'t Target) -> Self::Future {
(self.f)(target)
}
}
关于Service这个trait,主要关注的是 call 这个函数,这个函数执行了传入的 FnMut。
到这里基本上搞清楚 make_service_fn, service_fn, handler, 和 serve之间的关系,最后看下 serve返回的是什么。
let server = Server::bind(&addr).serve(make_service);
if let Err(e) = server.await {
println!("server err {}", e);
}
看到server.await 就指到 server一定是一个Future
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
where
I: Accept<Conn = IO, Error = IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{
type Output = crate::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.project().spawn_all.poll_watch(cx, &NoopWatcher)
}
}
直接看poll_watch
pub(super) fn poll_watch<W>(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
watcher: &W,
) -> Poll<crate::Result<()>>
where
E: NewSvcExec<IO, S::Future, S::Service, E, W>,
W: Watcher<IO, S::Service, E>,
{
let mut me = self.project();
loop {
if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {
let fut = NewSvcTask::new(connecting, watcher.clone());
me.serve
.as_mut()
.project()
.protocol
.exec
.execute_new_svc(fut);
} else {
return Poll::Ready(Ok(()));
}
}
}
这里 poll_next_
ready之后会得到一个 类型为Connecting的Future, 当这个future resolve之后,就会得到一个connection, NewSvcTast::new 返回的同样也是一个future, 这个poll会一直loop下去,直到 poll_next 返回 None 为止
这里的project() 和 Pin 有关 参考 下面这篇文章
Pin