Go的http.rpc是一个代码量少但设计精妙的标准库,如果我们想深入学习Go的用法以及想深入学习rpc的底层实现,这个是一个很好的学习资源。这里开始从源码来分析go rpc库的核心内容。
一个rpc框架需要解决以下几个核心技术问题:
- 跨平台跨语言:数据序列化和反序列化
- 怎么确认当前response对应的是哪个rpc request
- rpc请求发出后,怎么维护这些等待回复的协程
- 异步模式的支持:callback,阻塞,future
接下来,我们将从RPC服务端和客户端的各自实现来分析Go rpc标准库的核心内容。
客户端发起一个远程调用
https://github.com/golang/go/blob/master/src/net/rpc/client_test.go
按照官方例子,这样启动一个rpc客户端并发起一次远程调用
// 跟远程server建立tcp连接
client, err := rpc.Dial("tcp", "127.0.0.1:8080")
// 除了上面的tcp模式,还支持http模式,jsonrpc模式建立连接
// jsonrpc.Dial("tcp", "127.0.0.1:8080");
// rpc.DialHTTP("tcp", "127.0.0.1:8080");
if err != nil {
panic(err)
}
// 发起一个远程调用,方法是S.Recv,请求是struct{}{},回复是reply结构体,这是个阻塞调用
var reply Reply
err = client.Call("S.Recv", &struct{}{}, &reply)
if err != nil {
panic(err)
}
fmt.Printf("%#v\n", reply)
首先我们先来分析一下Client结构体,这个结构体负责管理RPC客户端远程调用的所有信息。
// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
codec ClientCodec //客户端的编解码模块
reqMutex sync.Mutex // protects followin,保护request的互斥锁
request Request
mutex sync.Mutex // protects following;seq的互斥锁
seq uint64
pending map[uint64]*Call // 还没请求返回的请求会放在这个map里,即请求处于pending状态
closing bool // user has called Close 这个client是否已经正在关闭
shutdown bool // server has told us to stop ; client关闭标记
}
// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
Client结构体是管理发起远程调用的管理者,一个进程中理论上只需new一个client,后续的所有的请求都由这个client管理。这里详细解析client结构体的关键数据结构:
- codec ClientCodec:编解码模块,用于请求发出时的编码,收到response时的解码,编码方式支持gob(二进制,效率更高)和json。
- request Request:请求结构体
- reqMutex sync.Mutex:对request进行加锁,保证并发安全
- seq uint64:请求的序列号,标记每个请求。每发起一个rpc请求,这个值就加一
- mutex sync.Mutex:seq的互斥锁,因为并发会存在一个时间下多个协程修改seq的情形。
- pending map[uint64]*Call:使用一个map来缓存发出了请求但尚未收到回复的所有请求。
- closing bool:client即将关闭,处理完剩余请求就会shutdown
- shutdown bool: client关闭标记
在client机构体中,我们使用了一个 map[uint64]*Call 来存储发出但没返回的请求,这里的Call结构体如下:
// Call represents an active RPC.
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Receives *Call when Go is complete.
}
Dial方法是用于跟目标server建立好rpc连接,传入的参数是网络协议和对方server的IP:PORT。rpc的Dial里面调用的是net.Dial,建立一个tcp连接,同时Dial方法调用了NewClient(conn),并作为返回值,返回上层。
NewClient实际new的是一个gob编解码对象gobClientCodec,在NewClientWithCodec方法里,创建了一个协程(go client.input())专门处理远程调用的response message,当从网络中收到消息时,就会触发input里的后续处理逻辑。
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
//
// The read and write halves of the connection are serialized independently,
// so no interlocking is required. However each half may be accessed
// concurrently so the implementation of conn should protect against
// concurrent reads or concurrent writes.
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
Call方法我认为是整个rpc框架中最重要的一环,这个接口屏蔽了请求从发送到等待,再到收到回复唤醒协程的整个过程。
观察Call的实现,传入参数分别为远程调用的函数方法名,请求参数,回复结构体。注意请求和回复是interface类型,表示我们可以传入任意类型的结构体,十分灵活。
调用Call方法时,我们就会对指定server发起指定函数调用,整个远程调用的流程比较复杂,涉及到编码、网络、远程Server的逻辑处理、client收到回包、解码多个子步骤,但是这里并没有新创建一个协程来维护整个远程调用流程。收到回复的通知是通过channel来实现的。
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
Call方法时对Go方法的封装,核心代码是处于Go函数里,Go方法里实现了以下几个事情:
- new一个Call对象
- 给Call对象赋值serviceMethod,args,reply,以及参数传入的channel
- client.send 发送请求
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
我们继续探索核心函数client.send,client.send负责的工作就是将远程调用的请求发出,幷将请求序列号+1,把等待回复的请求协程放在pending map中缓存。
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock() // 保护seq
if client.shutdown || client.closing { // 如果client处于关闭或者正在状态下,我们本次发送是失败的
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++ // 请求序列号自增
client.pending[seq] = call // 请求放在pending map里缓存
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq // 请求的序列号是要带过去server那边,到时请求返回时再带回来,这才能直到这是哪个请求的回复
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args) // 使用编码器做请求编码,并发送到网络
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
请求的发送我们已经了解,现在开始分析请求的response是怎么接收,幷唤醒休眠的协程的。这部分逻辑为input()函数,input()函数是在NewClient时就触发执行。
- 先通过client编解码器解码出response
- 获取到序列号response.Seq,通过该序列号找出pending map中对应的rpc请求,取出请求后就从map中删除该请求。注意这个步骤需要加锁。
- 根据response的error信息来决定是走错误处理流程还是走正常数据处理流程
- client.codec.ReadResponseBody 将response数据读到call.Reply对象中,幷调用call.done()来通过channel唤醒等待的协程。
- 特别注意,如果远程调用出错(调用失败,回复的序列号并不在pending map中),会跳出接收消息的循环,直接走到关闭连接的流程:client.shutdown设置为true,然后遍历pending map,通知里面缓存的所有等待回复的请求协程需要因错误而关闭。
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil: // 如果response中带的序列号我们在pending map里找不到,会导致err不为nil,那就直接跳到了switch后的流程,关闭整个rpc连接
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
当我们显式执行client.close时,实际执行的是:
- 加锁,将client.closing设置为true
- 解锁,关闭client的解码模块,client.codec.Close(),
// Close calls the underlying codec's Close method. If the connection is already
// shutting down, ErrShutdown is returned.
func (client *Client) Close() error {
client.mutex.Lock()
if client.closing {
client.mutex.Unlock()
return ErrShutdown
}
client.closing = true
client.mutex.Unlock()
return client.codec.Close()
}
以上调用client.call时是同步阻塞的RPC调用,如果我们希望异步调用,可以这么写,利用client.Go来获取channel,在合适时再去监听channel的消息,如果此时消息已经到达,那就取数据继续执行;如果数据未到达那就继续阻塞。这个模式也就是我们常说的future模式。
// Asynchronous call
quotient := new(Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
服务器收到一个远程调用
我们以支持http协议的rpc server为例,开始我们的源码分析,下面是启动一个rpc server幷注册服务的流程。这里我们定义了Rect结构体,Rect里实现了Area和Perimeter方法。
type Params struct {
Width, Height int;
}
type Rect struct{}
//函数必须是导出的
//必须有两个导出类型参数
//第一个参数是接收参数
//第二个参数是返回给客户端参数,必须是指针类型
//函数还要有一个返回值error
func (r *Rect) Area(p Params, ret *int) error {
*ret = p.Width * p.Height;
return nil;
}
func (r *Rect) Perimeter(p Params, ret *int) error {
*ret = (p.Width + p.Height) * 2;
return nil;
}
func main() {
rect := new(Rect);
//注册一个rect服务
rpc.Register(rect);
//把服务处理绑定到http协议上
rpc.HandleHTTP();
err := http.ListenAndServe(":8080", nil);
if err != nil {
log.Fatal(err);
}
}
Register实际调用的是Server.register,里面首先利用反射机制获取服务rcvr的type,value,以及服务名(结构体名就是服务名),如果有传入参数name才把服务名改为name,否则默认使用结构体rcvr变量名字。当client发起远程调用server方法时,通过显式指定方法名”Rect.Area”表明调用的是server Rect对象的Area方法。
register使用了go的反射特性,这里回忆一下TypeOf()和ValueOf()的特性。使用 reflect.TypeOf() 函数可以获得任意值的类型对象(reflect.Type),程序通过类型对象可以访问任意值的类型信息。reflect.ValueOf 返回 reflect.Value 类型,包含有 rawValue 的值信息。reflect.Value 与原值间可以通过值包装和值获取互相转化。reflect.Value 是一些反射操作的重要类型,如反射调用函数。
func (server *Server) Register(rcvr interface{}) error {
return server.register(rcvr, "", false)
}
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if useName {
sname = name
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
if !token.IsExported(sname) && !useName {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, logRegisterError)
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
log.Print(str)
return errors.New(str)
}
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
suitableMethods方法完成的事情是将对象rcvr实现的所有方法都放进一个map里存储,suitableMethods的返回值就是一个map(map[string]*methodType),key是对象rcvr实现的方法的方法名,value是方法名对应的具体函数方法,如形参和返回值。
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
register最后把service注册进syc.map。key是服务名,默认是传入的对象rcvr结构的名字,value是service对象,里面其实包含了rcvr的所有具体函数实现。
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
接着是rpc.HandleHTTP(),本质也是调用http.Handle,参数server就是我们建立的rpc server。当然Server结构体实现了ServeHTTP。
const (
// Defaults used by HandleHTTP
DefaultRPCPath = "/_goRPC_"
DefaultDebugPath = "/debug/rpc"
)
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
http.Handle(rpcPath, server)
http.Handle(debugPath, debugHTTP{server})
}
// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func HandleHTTP() {
DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}
DefaultServer是一个全局变量,在进程启动时就已经NewServer()放在全局变量DefaultServer中存储,serviceMap用于存储各个注册好的对象,当一个rpc请求到来时,就从serviceMap中找出对应的处理方法service。我们看下Server结构体的具体定义。
// Server represents an RPC Server.
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
service结构体定义如下
- name:服务名,默认是结构体变量名,如Rect
- rcvr:reflect.ValueOf()得到
- typ:reflect.TypeOf()得到
- method:注册好的方法映射表,key是具体方法名,value是对应的方法实例配置。
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
http.ListenAndServe(“:8080”, nil);之后就是开始监听端口,监听网络IO事件。
当一个远程调用request到来时,ServeCodec收到了数据读取的请求,读取解码远程调用request。这里分析一下ServeCodec的主要逻辑,ServeCodec在server被new出来后就执行到,然后在一个for循环内阻塞等待请求。当有请求到达后,readRequest解码出请求的请求service,请求结构体,请求参数等数据,之后开一个协程单独处理这个RPC请求:go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
。
值得注意的是,ServeCodec使用了sync.WaitGroup来管理RPC请求的响应,当ServeCodec需要关闭时,需要等到sync.WaitGroup内所有响应流程都结束后,再推出这个ServeCodec流程,这就是我们常说的优雅退出。
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
// We've seen that there are no more requests.
// Wait for responses to be sent before closing codec.
wg.Wait()
codec.Close()
}
这里继续深挖service.call的实现:
- mtype.numCalls用于计数当前service被调用过多少次,自增前需要加锁
- mtype.method.Func取调用的具体函数方法,取出之后可以通过反射机制调用这个函数,returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
- 函数返回值是returnValues,是个error类型,而函数执行后的具体回复是放在replyv
- 最后使用server.sendResponse把函数调用后的结果返回给客户端。
- server.freeRequest清理请求记录,需要加锁处理,把空的request对象放回请求链表复用。
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
}
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}
func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
}
reflect包调用函数call,用的比较少,这里也特意记录学习下。我们调用的是function.Call([]reflect.Value{s.rcvr, argv, replyv})
,假设调用的是Rect.Area方法,而处理函数的声明为func (r *Rect) Area(p Params, ret *int) error
。
https://github.com/golang/go/blob/8ac5cbe05d61df0a7a7c9a38ff33305d4dcfea32/src/reflect/value.go#L335
// Call calls the function v with the input arguments in.
// For example, if len(in) == 3, v.Call(in) represents the Go call v(in[0], in[1], in[2]).
// Call panics if v's Kind is not Func.
// It returns the output results as Values.
// As in Go, each input argument must be assignable to the
// type of the function's corresponding input parameter.
// If v is a variadic function, Call creates the variadic slice parameter
// itself, copying in the corresponding values.
func (v Value) Call(in []Value) []Value {
v.mustBe(Func)
v.mustBeExported()
return v.call("Call", in)
}