Server

rpcx

  1. // Server is rpcx server that use TCP or UDP.
  2. type Server struct {
  3. ln net.Listener
  4. readTimeout time.Duration
  5. writeTimeout time.Duration
  6. gatewayHTTPServer *http.Server
  7. DisableHTTPGateway bool // should disable http invoke or not.
  8. DisableJSONRPC bool // should disable json rpc or not.
  9. serviceMapMu sync.RWMutex
  10. serviceMap map[string]*service
  11. mu sync.RWMutex
  12. activeConn map[net.Conn]struct{}
  13. doneChan chan struct{}
  14. seq uint64
  15. inShutdown int32
  16. onShutdown []func(s *Server)
  17. // TLSConfig for creating tls tcp connection.
  18. tlsConfig *tls.Config
  19. // BlockCrypt for kcp.BlockCrypt
  20. options map[string]interface{}
  21. // CORS options
  22. corsOptions *CORSOptions
  23. Plugins PluginContainer
  24. // AuthFunc can be used to auth.
  25. AuthFunc func(ctx context.Context, req *protocol.Message, token string) error
  26. handlerMsgNum int32
  27. }

gorilla/rpc

  1. // Server serves registered RPC services using registered codecs.
  2. type Server struct {
  3. codecs map[string]Codec
  4. services *serviceMap
  5. interceptFunc func(i *RequestInfo) *http.Request
  6. beforeFunc func(i *RequestInfo)
  7. afterFunc func(i *RequestInfo)
  8. }

grpc-go

  1. // Server is a gRPC server to serve RPC requests.
  2. type Server struct {
  3. opts serverOptions
  4. mu sync.Mutex // guards following
  5. lis map[net.Listener]bool
  6. conns map[transport.ServerTransport]bool
  7. serve bool
  8. drain bool
  9. cv *sync.Cond // signaled when connections close for GracefulStop
  10. m map[string]*service // service name -> service info
  11. events trace.EventLog
  12. quit *grpcsync.Event
  13. done *grpcsync.Event
  14. channelzRemoveOnce sync.Once
  15. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  16. channelzID int64 // channelz unique identification number
  17. czData *channelzData
  18. }

Client

rpcx

  1. // RPCClient is interface that defines one client to call one server.
  2. type RPCClient interface {
  3. Connect(network, address string) error
  4. Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
  5. Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
  6. SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
  7. Close() error
  8. RegisterServerMessageChan(ch chan<- *protocol.Message)
  9. UnregisterServerMessageChan()
  10. IsClosing() bool
  11. IsShutdown() bool
  12. }
  1. // Client represents a RPC client.
  2. type Client struct {
  3. option Option
  4. Conn net.Conn
  5. r *bufio.Reader
  6. //w *bufio.Writer
  7. mutex sync.Mutex // protects following
  8. seq uint64
  9. pending map[uint64]*Call
  10. closing bool // user has called Close
  11. shutdown bool // server has told us to stop
  12. pluginClosed bool // the plugin has been called
  13. Plugins PluginContainer
  14. ServerMessageChan chan<- *protocol.Message
  15. }

grpc-go

  1. // ClientConn represents a client connection to an RPC server.
  2. type ClientConn struct {
  3. ctx context.Context
  4. cancel context.CancelFunc
  5. target string
  6. parsedTarget resolver.Target
  7. authority string
  8. dopts dialOptions
  9. csMgr *connectivityStateManager
  10. balancerBuildOpts balancer.BuildOptions
  11. blockingpicker *pickerWrapper
  12. mu sync.RWMutex
  13. resolverWrapper *ccResolverWrapper
  14. sc *ServiceConfig
  15. conns map[*addrConn]struct{}
  16. // Keepalive parameter can be updated if a GoAway is received.
  17. mkp keepalive.ClientParameters
  18. curBalancerName string
  19. balancerWrapper *ccBalancerWrapper
  20. retryThrottler atomic.Value
  21. firstResolveEvent *grpcsync.Event
  22. channelzID int64 // channelz unique identification number
  23. czData *channelzData
  24. }

Codec

rpcx

  1. // Codec defines the interface that decode/encode payload.
  2. type Codec interface {
  3. Encode(i interface{}) ([]byte, error)
  4. Decode(data []byte, i interface{}) error
  5. }

gorilla/rpc

  1. // Codec creates a CodecRequest to process each request.
  2. type Codec interface {
  3. NewRequest(*http.Request) CodecRequest
  4. }
  5. // CodecRequest decodes a request and encodes a response using a specific
  6. // serialization scheme.
  7. type CodecRequest interface {
  8. // Reads request and returns the RPC method name.
  9. Method() (string, error)
  10. // Reads request filling the RPC method args.
  11. ReadRequest(interface{}) error
  12. // Writes response using the RPC method reply. The error parameter is
  13. // the error returned by the method call, if any.
  14. WriteResponse(http.ResponseWriter, interface{}, error) error
  15. }

Service

rpcx

  1. type methodType struct {
  2. sync.Mutex // protects counters
  3. method reflect.Method
  4. ArgType reflect.Type
  5. ReplyType reflect.Type
  6. // numCalls uint
  7. }
  8. type functionType struct {
  9. sync.Mutex // protects counters
  10. fn reflect.Value
  11. ArgType reflect.Type
  12. ReplyType reflect.Type
  13. }
  14. type service struct {
  15. name string // name of service
  16. rcvr reflect.Value // receiver of methods for the service
  17. typ reflect.Type // type of the receiver
  18. method map[string]*methodType // registered methods
  19. function map[string]*functionType // registered functions
  20. }

gorilla/rpc

  1. type service struct {
  2. name string // name of service
  3. rcvr reflect.Value // receiver of methods for the service
  4. rcvrType reflect.Type // type of the receiver
  5. methods map[string]*serviceMethod // registered methods
  6. passReq bool
  7. }
  8. type serviceMethod struct {
  9. method reflect.Method // receiver method
  10. argsType reflect.Type // type of the request argument
  11. replyType reflect.Type // type of the response argument
  12. }

grpc-go

  1. // MethodDesc represents an RPC service's method specification.
  2. type MethodDesc struct {
  3. MethodName string
  4. Handler methodHandler
  5. }
  6. // ServiceDesc represents an RPC service's specification.
  7. type ServiceDesc struct {
  8. ServiceName string
  9. // The pointer to the service interface. Used to check whether the user
  10. // provided implementation satisfies the interface requirements.
  11. HandlerType interface{}
  12. Methods []MethodDesc
  13. Streams []StreamDesc
  14. Metadata interface{}
  15. }
  16. // service consists of the information of the server serving this service and
  17. // the methods in this service.
  18. type service struct {
  19. server interface{} // the server for service methods
  20. md map[string]*MethodDesc
  21. sd map[string]*StreamDesc
  22. mdata interface{}
  23. }

Option

grpc-go

  1. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  2. type ServerOption interface {
  3. apply(*serverOptions)
  4. }
  5. type serverOptions struct {
  6. creds credentials.TransportCredentials
  7. codec baseCodec
  8. cp Compressor
  9. dc Decompressor
  10. unaryInt UnaryServerInterceptor
  11. streamInt StreamServerInterceptor
  12. inTapHandle tap.ServerInHandle
  13. statsHandler stats.Handler
  14. maxConcurrentStreams uint32
  15. maxReceiveMessageSize int
  16. maxSendMessageSize int
  17. unknownStreamDesc *StreamDesc
  18. keepaliveParams keepalive.ServerParameters
  19. keepalivePolicy keepalive.EnforcementPolicy
  20. initialWindowSize int32
  21. initialConnWindowSize int32
  22. writeBufferSize int
  23. readBufferSize int
  24. connectionTimeout time.Duration
  25. maxHeaderListSize *uint32
  26. }

Connection

rpcx

tcp

  1. var makeListeners = make(map[string]MakeListener)
  2. func init() {
  3. makeListeners["tcp"] = tcpMakeListener("tcp")
  4. makeListeners["tcp4"] = tcpMakeListener("tcp4")
  5. makeListeners["tcp6"] = tcpMakeListener("tcp6")
  6. makeListeners["http"] = tcpMakeListener("tcp")
  7. }
  8. // RegisterMakeListener registers a MakeListener for network.
  9. func RegisterMakeListener(network string, ml MakeListener) {
  10. makeListeners[network] = ml
  11. }
  12. // MakeListener defines a listener generater.
  13. type MakeListener func(s *Server, address string) (ln net.Listener, err error)
  14. // block can be nil if the caller wishes to skip encryption in kcp.
  15. // tlsConfig can be nil iff we are not using network "quic".
  16. func (s *Server) makeListener(network, address string) (ln net.Listener, err error) {
  17. ml := makeListeners[network]
  18. if ml == nil {
  19. return nil, fmt.Errorf("can not make listener for %s", network)
  20. }
  21. return ml(s, address)
  22. }
  23. func tcpMakeListener(network string) func(s *Server, address string) (ln net.Listener, err error) {
  24. return func(s *Server, address string) (ln net.Listener, err error) {
  25. if s.tlsConfig == nil {
  26. ln, err = net.Listen(network, address)
  27. } else {
  28. ln, err = tls.Listen(network, address, s.tlsConfig)
  29. }
  30. return ln, err
  31. }
  32. }

quic

quicconn “github.com/marten-seemann/quic-conn”

  1. func init() {
  2. makeListeners["quic"] = quicMakeListener
  3. }
  4. func quicMakeListener(s *Server, address string) (ln net.Listener, err error) {
  5. if s.tlsConfig == nil {
  6. return nil, errors.New("TLSConfig must be configured in server.Options")
  7. }
  8. return quicconn.Listen("udp", address, s.tlsConfig)
  9. }

utp

“github.com/anacrolix/utp”

  1. func init() {
  2. makeListeners["utp"] = utpMakeListener
  3. }
  4. func utpMakeListener(s *Server, address string) (ln net.Listener, err error) {
  5. return utp.Listen(address)
  6. }

kcp

import kcp “github.com/xtaci/kcp-go”

  1. func init() {
  2. makeListeners["kcp"] = kcpMakeListener
  3. }
  4. func kcpMakeListener(s *Server, address string) (ln net.Listener, err error) {
  5. if s.options == nil || s.options["BlockCrypt"] == nil {
  6. return nil, errors.New("KCP BlockCrypt must be configured in server.Options")
  7. }
  8. return kcp.ListenWithOptions(address, s.options["BlockCrypt"].(kcp.BlockCrypt), 10, 3)
  9. }
  10. // WithBlockCrypt sets kcp.BlockCrypt.
  11. func WithBlockCrypt(bc kcp.BlockCrypt) OptionFn {
  12. return func(s *Server) {
  13. s.options["BlockCrypt"] = bc
  14. }
  15. }