Go的http.rpc是一个代码量少但设计精妙的标准库,如果我们想深入学习Go的用法以及想深入学习rpc的底层实现,这个是一个很好的学习资源。这里开始从源码来分析go rpc库的核心内容。
一个rpc框架需要解决以下几个核心技术问题:
- 跨平台跨语言:数据序列化和反序列化
- 怎么确认当前response对应的是哪个rpc request
- rpc请求发出后,怎么维护这些等待回复的协程
- 异步模式的支持:callback,阻塞,future
接下来,我们将从RPC服务端和客户端的各自实现来分析Go rpc标准库的核心内容。
客户端发起一个远程调用
https://github.com/golang/go/blob/master/src/net/rpc/client_test.go
按照官方例子,这样启动一个rpc客户端并发起一次远程调用
// 跟远程server建立tcp连接client, err := rpc.Dial("tcp", "127.0.0.1:8080")// 除了上面的tcp模式,还支持http模式,jsonrpc模式建立连接// jsonrpc.Dial("tcp", "127.0.0.1:8080");// rpc.DialHTTP("tcp", "127.0.0.1:8080");if err != nil {panic(err)}// 发起一个远程调用,方法是S.Recv,请求是struct{}{},回复是reply结构体,这是个阻塞调用var reply Replyerr = client.Call("S.Recv", &struct{}{}, &reply)if err != nil {panic(err)}fmt.Printf("%#v\n", reply)
首先我们先来分析一下Client结构体,这个结构体负责管理RPC客户端远程调用的所有信息。
// Client represents an RPC Client.// There may be multiple outstanding Calls associated// with a single Client, and a Client may be used by// multiple goroutines simultaneously.type Client struct {codec ClientCodec //客户端的编解码模块reqMutex sync.Mutex // protects followin,保护request的互斥锁request Requestmutex sync.Mutex // protects following;seq的互斥锁seq uint64pending map[uint64]*Call // 还没请求返回的请求会放在这个map里,即请求处于pending状态closing bool // user has called Close 这个client是否已经正在关闭shutdown bool // server has told us to stop ; client关闭标记}// Request is a header written before every RPC call. It is used internally// but documented here as an aid to debugging, such as when analyzing// network traffic.type Request struct {ServiceMethod string // format: "Service.Method"Seq uint64 // sequence number chosen by clientnext *Request // for free list in Server}
Client结构体是管理发起远程调用的管理者,一个进程中理论上只需new一个client,后续的所有的请求都由这个client管理。这里详细解析client结构体的关键数据结构:
- codec ClientCodec:编解码模块,用于请求发出时的编码,收到response时的解码,编码方式支持gob(二进制,效率更高)和json。
- request Request:请求结构体
- reqMutex sync.Mutex:对request进行加锁,保证并发安全
- seq uint64:请求的序列号,标记每个请求。每发起一个rpc请求,这个值就加一
- mutex sync.Mutex:seq的互斥锁,因为并发会存在一个时间下多个协程修改seq的情形。
- pending map[uint64]*Call:使用一个map来缓存发出了请求但尚未收到回复的所有请求。
- closing bool:client即将关闭,处理完剩余请求就会shutdown
- shutdown bool: client关闭标记
在client机构体中,我们使用了一个 map[uint64]*Call 来存储发出但没返回的请求,这里的Call结构体如下:
// Call represents an active RPC.type Call struct {ServiceMethod string // The name of the service and method to call.Args interface{} // The argument to the function (*struct).Reply interface{} // The reply from the function (*struct).Error error // After completion, the error status.Done chan *Call // Receives *Call when Go is complete.}
Dial方法是用于跟目标server建立好rpc连接,传入的参数是网络协议和对方server的IP:PORT。rpc的Dial里面调用的是net.Dial,建立一个tcp连接,同时Dial方法调用了NewClient(conn),并作为返回值,返回上层。
NewClient实际new的是一个gob编解码对象gobClientCodec,在NewClientWithCodec方法里,创建了一个协程(go client.input())专门处理远程调用的response message,当从网络中收到消息时,就会触发input里的后续处理逻辑。
// Dial connects to an RPC server at the specified network address.func Dial(network, address string) (*Client, error) {conn, err := net.Dial(network, address)if err != nil {return nil, err}return NewClient(conn), nil}// NewClient returns a new Client to handle requests to the// set of services at the other end of the connection.// It adds a buffer to the write side of the connection so// the header and payload are sent as a unit.//// The read and write halves of the connection are serialized independently,// so no interlocking is required. However each half may be accessed// concurrently so the implementation of conn should protect against// concurrent reads or concurrent writes.func NewClient(conn io.ReadWriteCloser) *Client {encBuf := bufio.NewWriter(conn)client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}return NewClientWithCodec(client)}// NewClientWithCodec is like NewClient but uses the specified// codec to encode requests and decode responses.func NewClientWithCodec(codec ClientCodec) *Client {client := &Client{codec: codec,pending: make(map[uint64]*Call),}go client.input()return client}
Call方法我认为是整个rpc框架中最重要的一环,这个接口屏蔽了请求从发送到等待,再到收到回复唤醒协程的整个过程。
观察Call的实现,传入参数分别为远程调用的函数方法名,请求参数,回复结构体。注意请求和回复是interface类型,表示我们可以传入任意类型的结构体,十分灵活。
调用Call方法时,我们就会对指定server发起指定函数调用,整个远程调用的流程比较复杂,涉及到编码、网络、远程Server的逻辑处理、client收到回包、解码多个子步骤,但是这里并没有新创建一个协程来维护整个远程调用流程。收到回复的通知是通过channel来实现的。
// Call invokes the named function, waits for it to complete, and returns its error status.func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error}
Call方法时对Go方法的封装,核心代码是处于Go函数里,Go方法里实现了以下几个事情:
- new一个Call对象
- 给Call对象赋值serviceMethod,args,reply,以及参数传入的channel
- client.send 发送请求
// Go invokes the function asynchronously. It returns the Call structure representing// the invocation. The done channel will signal when the call is complete by returning// the same Call object. If done is nil, Go will allocate a new channel.// If non-nil, done must be buffered or Go will deliberately crash.func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {call := new(Call)call.ServiceMethod = serviceMethodcall.Args = argscall.Reply = replyif done == nil {done = make(chan *Call, 10) // buffered.} else {// If caller passes done != nil, it must arrange that// done has enough buffer for the number of simultaneous// RPCs that will be using that channel. If the channel// is totally unbuffered, it's best not to run at all.if cap(done) == 0 {log.Panic("rpc: done channel is unbuffered")}}call.Done = doneclient.send(call)return call}
我们继续探索核心函数client.send,client.send负责的工作就是将远程调用的请求发出,幷将请求序列号+1,把等待回复的请求协程放在pending map中缓存。
func (client *Client) send(call *Call) {client.reqMutex.Lock()defer client.reqMutex.Unlock()// Register this call.client.mutex.Lock() // 保护seqif client.shutdown || client.closing { // 如果client处于关闭或者正在状态下,我们本次发送是失败的client.mutex.Unlock()call.Error = ErrShutdowncall.done()return}seq := client.seqclient.seq++ // 请求序列号自增client.pending[seq] = call // 请求放在pending map里缓存client.mutex.Unlock()// Encode and send the request.client.request.Seq = seq // 请求的序列号是要带过去server那边,到时请求返回时再带回来,这才能直到这是哪个请求的回复client.request.ServiceMethod = call.ServiceMethoderr := client.codec.WriteRequest(&client.request, call.Args) // 使用编码器做请求编码,并发送到网络if err != nil {client.mutex.Lock()call = client.pending[seq]delete(client.pending, seq)client.mutex.Unlock()if call != nil {call.Error = errcall.done()}}}
请求的发送我们已经了解,现在开始分析请求的response是怎么接收,幷唤醒休眠的协程的。这部分逻辑为input()函数,input()函数是在NewClient时就触发执行。
- 先通过client编解码器解码出response
- 获取到序列号response.Seq,通过该序列号找出pending map中对应的rpc请求,取出请求后就从map中删除该请求。注意这个步骤需要加锁。
- 根据response的error信息来决定是走错误处理流程还是走正常数据处理流程
- client.codec.ReadResponseBody 将response数据读到call.Reply对象中,幷调用call.done()来通过channel唤醒等待的协程。
- 特别注意,如果远程调用出错(调用失败,回复的序列号并不在pending map中),会跳出接收消息的循环,直接走到关闭连接的流程:client.shutdown设置为true,然后遍历pending map,通知里面缓存的所有等待回复的请求协程需要因错误而关闭。
func (client *Client) input() {var err errorvar response Responsefor err == nil {response = Response{}err = client.codec.ReadResponseHeader(&response)if err != nil {break}seq := response.Seqclient.mutex.Lock()call := client.pending[seq]delete(client.pending, seq)client.mutex.Unlock()switch {case call == nil: // 如果response中带的序列号我们在pending map里找不到,会导致err不为nil,那就直接跳到了switch后的流程,关闭整个rpc连接// We've got no pending call. That usually means that// WriteRequest partially failed, and call was already// removed; response is a server telling us about an// error reading request body. We should still attempt// to read error body, but there's no one to give it to.err = client.codec.ReadResponseBody(nil)if err != nil {err = errors.New("reading error body: " + err.Error())}case response.Error != "":// We've got an error response. Give this to the request;// any subsequent requests will get the ReadResponseBody// error if there is one.call.Error = ServerError(response.Error)err = client.codec.ReadResponseBody(nil)if err != nil {err = errors.New("reading error body: " + err.Error())}call.done()default:err = client.codec.ReadResponseBody(call.Reply)if err != nil {call.Error = errors.New("reading body " + err.Error())}call.done()}}// Terminate pending calls.client.reqMutex.Lock()client.mutex.Lock()client.shutdown = trueclosing := client.closingif err == io.EOF {if closing {err = ErrShutdown} else {err = io.ErrUnexpectedEOF}}for _, call := range client.pending {call.Error = errcall.done()}client.mutex.Unlock()client.reqMutex.Unlock()if debugLog && err != io.EOF && !closing {log.Println("rpc: client protocol error:", err)}}
当我们显式执行client.close时,实际执行的是:
- 加锁,将client.closing设置为true
- 解锁,关闭client的解码模块,client.codec.Close(),
// Close calls the underlying codec's Close method. If the connection is already// shutting down, ErrShutdown is returned.func (client *Client) Close() error {client.mutex.Lock()if client.closing {client.mutex.Unlock()return ErrShutdown}client.closing = trueclient.mutex.Unlock()return client.codec.Close()}
以上调用client.call时是同步阻塞的RPC调用,如果我们希望异步调用,可以这么写,利用client.Go来获取channel,在合适时再去监听channel的消息,如果此时消息已经到达,那就取数据继续执行;如果数据未到达那就继续阻塞。这个模式也就是我们常说的future模式。
// Asynchronous callquotient := new(Quotient)divCall := client.Go("Arith.Divide", args, quotient, nil)replyCall := <-divCall.Done // will be equal to divCall
服务器收到一个远程调用
我们以支持http协议的rpc server为例,开始我们的源码分析,下面是启动一个rpc server幷注册服务的流程。这里我们定义了Rect结构体,Rect里实现了Area和Perimeter方法。
type Params struct {Width, Height int;}type Rect struct{}//函数必须是导出的//必须有两个导出类型参数//第一个参数是接收参数//第二个参数是返回给客户端参数,必须是指针类型//函数还要有一个返回值errorfunc (r *Rect) Area(p Params, ret *int) error {*ret = p.Width * p.Height;return nil;}func (r *Rect) Perimeter(p Params, ret *int) error {*ret = (p.Width + p.Height) * 2;return nil;}func main() {rect := new(Rect);//注册一个rect服务rpc.Register(rect);//把服务处理绑定到http协议上rpc.HandleHTTP();err := http.ListenAndServe(":8080", nil);if err != nil {log.Fatal(err);}}
Register实际调用的是Server.register,里面首先利用反射机制获取服务rcvr的type,value,以及服务名(结构体名就是服务名),如果有传入参数name才把服务名改为name,否则默认使用结构体rcvr变量名字。当client发起远程调用server方法时,通过显式指定方法名”Rect.Area”表明调用的是server Rect对象的Area方法。
register使用了go的反射特性,这里回忆一下TypeOf()和ValueOf()的特性。使用 reflect.TypeOf() 函数可以获得任意值的类型对象(reflect.Type),程序通过类型对象可以访问任意值的类型信息。reflect.ValueOf 返回 reflect.Value 类型,包含有 rawValue 的值信息。reflect.Value 与原值间可以通过值包装和值获取互相转化。reflect.Value 是一些反射操作的重要类型,如反射调用函数。
func (server *Server) Register(rcvr interface{}) error {return server.register(rcvr, "", false)}func (server *Server) register(rcvr interface{}, name string, useName bool) error {s := new(service)s.typ = reflect.TypeOf(rcvr)s.rcvr = reflect.ValueOf(rcvr)sname := reflect.Indirect(s.rcvr).Type().Name()if useName {sname = name}if sname == "" {s := "rpc.Register: no service name for type " + s.typ.String()log.Print(s)return errors.New(s)}if !token.IsExported(sname) && !useName {s := "rpc.Register: type " + sname + " is not exported"log.Print(s)return errors.New(s)}s.name = sname// Install the methodss.method = suitableMethods(s.typ, logRegisterError)if len(s.method) == 0 {str := ""// To help the user, see if a pointer receiver would work.method := suitableMethods(reflect.PtrTo(s.typ), false)if len(method) != 0 {str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"} else {str = "rpc.Register: type " + sname + " has no exported methods of suitable type"}log.Print(str)return errors.New(str)}if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {return errors.New("rpc: service already defined: " + sname)}return nil}
suitableMethods方法完成的事情是将对象rcvr实现的所有方法都放进一个map里存储,suitableMethods的返回值就是一个map(map[string]*methodType),key是对象rcvr实现的方法的方法名,value是方法名对应的具体函数方法,如形参和返回值。
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
register最后把service注册进syc.map。key是服务名,默认是传入的对象rcvr结构的名字,value是service对象,里面其实包含了rcvr的所有具体函数实现。
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {return errors.New("rpc: service already defined: " + sname)}
接着是rpc.HandleHTTP(),本质也是调用http.Handle,参数server就是我们建立的rpc server。当然Server结构体实现了ServeHTTP。
const (// Defaults used by HandleHTTPDefaultRPCPath = "/_goRPC_"DefaultDebugPath = "/debug/rpc")func (server *Server) HandleHTTP(rpcPath, debugPath string) {http.Handle(rpcPath, server)http.Handle(debugPath, debugHTTP{server})}// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer// on DefaultRPCPath and a debugging handler on DefaultDebugPath.// It is still necessary to invoke http.Serve(), typically in a go statement.func HandleHTTP() {DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)}// ServeHTTP implements an http.Handler that answers RPC requests.func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {if req.Method != "CONNECT" {w.Header().Set("Content-Type", "text/plain; charset=utf-8")w.WriteHeader(http.StatusMethodNotAllowed)io.WriteString(w, "405 must CONNECT\n")return}conn, _, err := w.(http.Hijacker).Hijack()if err != nil {log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())return}io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")server.ServeConn(conn)}
DefaultServer是一个全局变量,在进程启动时就已经NewServer()放在全局变量DefaultServer中存储,serviceMap用于存储各个注册好的对象,当一个rpc请求到来时,就从serviceMap中找出对应的处理方法service。我们看下Server结构体的具体定义。
// Server represents an RPC Server.type Server struct {serviceMap sync.Map // map[string]*servicereqLock sync.Mutex // protects freeReqfreeReq *RequestrespLock sync.Mutex // protects freeRespfreeResp *Response}// NewServer returns a new Server.func NewServer() *Server {return &Server{}}// DefaultServer is the default instance of *Server.var DefaultServer = NewServer()
service结构体定义如下
- name:服务名,默认是结构体变量名,如Rect
- rcvr:reflect.ValueOf()得到
- typ:reflect.TypeOf()得到
- method:注册好的方法映射表,key是具体方法名,value是对应的方法实例配置。
type service struct {name string // name of servicercvr reflect.Value // receiver of methods for the servicetyp reflect.Type // type of the receivermethod map[string]*methodType // registered methods}
http.ListenAndServe(“:8080”, nil);之后就是开始监听端口,监听网络IO事件。
当一个远程调用request到来时,ServeCodec收到了数据读取的请求,读取解码远程调用request。这里分析一下ServeCodec的主要逻辑,ServeCodec在server被new出来后就执行到,然后在一个for循环内阻塞等待请求。当有请求到达后,readRequest解码出请求的请求service,请求结构体,请求参数等数据,之后开一个协程单独处理这个RPC请求:go service.call(server, sending, wg, mtype, req, argv, replyv, codec)。
值得注意的是,ServeCodec使用了sync.WaitGroup来管理RPC请求的响应,当ServeCodec需要关闭时,需要等到sync.WaitGroup内所有响应流程都结束后,再推出这个ServeCodec流程,这就是我们常说的优雅退出。
func (server *Server) ServeCodec(codec ServerCodec) {sending := new(sync.Mutex)wg := new(sync.WaitGroup)for {service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)if err != nil {if debugLog && err != io.EOF {log.Println("rpc:", err)}if !keepReading {break}// send a response if we actually managed to read a header.if req != nil {server.sendResponse(sending, req, invalidRequest, codec, err.Error())server.freeRequest(req)}continue}wg.Add(1)go service.call(server, sending, wg, mtype, req, argv, replyv, codec)}// We've seen that there are no more requests.// Wait for responses to be sent before closing codec.wg.Wait()codec.Close()}
这里继续深挖service.call的实现:
- mtype.numCalls用于计数当前service被调用过多少次,自增前需要加锁
- mtype.method.Func取调用的具体函数方法,取出之后可以通过反射机制调用这个函数,returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
- 函数返回值是returnValues,是个error类型,而函数执行后的具体回复是放在replyv
- 最后使用server.sendResponse把函数调用后的结果返回给客户端。
- server.freeRequest清理请求记录,需要加锁处理,把空的request对象放回请求链表复用。
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {if wg != nil {defer wg.Done()}mtype.Lock()mtype.numCalls++mtype.Unlock()function := mtype.method.Func// Invoke the method, providing a new value for the reply.returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})// The return value for the method is an error.errInter := returnValues[0].Interface()errmsg := ""if errInter != nil {errmsg = errInter.(error).Error()}server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)server.freeRequest(req)}func (server *Server) getRequest() *Request {server.reqLock.Lock()req := server.freeReqif req == nil {req = new(Request)} else {server.freeReq = req.next*req = Request{}}server.reqLock.Unlock()return req}func (server *Server) freeRequest(req *Request) {server.reqLock.Lock()req.next = server.freeReqserver.freeReq = reqserver.reqLock.Unlock()}
reflect包调用函数call,用的比较少,这里也特意记录学习下。我们调用的是function.Call([]reflect.Value{s.rcvr, argv, replyv}),假设调用的是Rect.Area方法,而处理函数的声明为func (r *Rect) Area(p Params, ret *int) error。
https://github.com/golang/go/blob/8ac5cbe05d61df0a7a7c9a38ff33305d4dcfea32/src/reflect/value.go#L335
// Call calls the function v with the input arguments in.// For example, if len(in) == 3, v.Call(in) represents the Go call v(in[0], in[1], in[2]).// Call panics if v's Kind is not Func.// It returns the output results as Values.// As in Go, each input argument must be assignable to the// type of the function's corresponding input parameter.// If v is a variadic function, Call creates the variadic slice parameter// itself, copying in the corresponding values.func (v Value) Call(in []Value) []Value {v.mustBe(Func)v.mustBeExported()return v.call("Call", in)}
