rpcx 框架

作者:李骁

rpcx 框架简介

框架rpcx包含了服务发现、负载均衡、故障转移等服务治理能力,拥有较多的特性,例如无需定义.proto文件,支持跨语言的服务调用等。目前只支持Go语言,但性能良好,可以当作微服务框架来使用。

下面开始来了解下rpcx的使用,文中例子用户服务作为本篇全文的通用示例,看看利用rpcx框架来实现RPC难易程度如何。

首先安装 rpcx框架:

  1. go get -u -v github.com/smallnest/rpcx/...

由于rpcx 后续服务注册中心的需要,还需要加上一些标签来安装,即使这些标签刚开始可能用不上,但建议最好都选择安装,或许最合适的安装命令是这样:

  1. go get -u -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...

rpcx 构建服务

由于rpcx对开发的目录结构并没有强制性的规定,所以首先需要为项目规划良好的工程目录结构,下面是用户服务在rpcx中的目录结构:

  1. └─appservice
  2. └─member
  3. ├─cmd
  4. ├─client
  5. client.go
  6. └─server
  7. server.go
  8. ├─model
  9. member.go
  10. └─service
  11. service.go

appservice作为所有服务的总目录入口,member目录是用户服务的目录,下面cmd作为客户端和服务端入口程序的目录,model目录专门用来定义数据结构,而service作为服务的主要实现目录,存放service.go文件,在该文件中定义了用户服务的所有方法和参数类型结构。这就是单个服务整体的目录结构,当然如果有配置项还可以建立conf目录。

该服务的目录结构建议在其他服务中也保持一致,这样在开发中对提升效率会有较大帮助,而且这样的约定也是在开发中非常有必要存在的。

在最关键的service目录中,定义了服务的主要实现。文件service.go主要代码如下:

  1. type Args struct {
  2. Uid int
  3. }
  4. type Reply struct {
  5. model.User
  6. }
  7. type ServiceUser struct {
  8. }
  9. func (s *ServiceUser) UserInfo(ctx context.Context, args *Args, reply *Reply) error {
  10. fmt.Println("service:", args.Uid)
  11. reply.User.AddTime = 14990093
  12. reply.User.Uface = "http://image.xxxx.xxx/t.gif"
  13. reply.User.UID = int64(args.Uid)
  14. reply.User.UserName = "Joke"
  15. reply.User.UserType = 2
  16. return nil
  17. }

ServiceUser作为服务结构体存在,UserInfo(ctx context.Context, args Args, reply Reply)方法是用户服务的方法,这个方法需要满足一定的约束:

  • 服务方法是可导出的(首字母大写)
  • 该方法必须有两个可导出或是内建类型的参数
  • 第一个参数为context.Context,第二个参数是输入参数用来接收数据,第三个参数作为输出参数且必须是指针类型
  • 方法返回类型为error

这些约束条件中除了第一个参数为context.Context,其他的条件大致与Go语言中定义的RPC方法需要满足一定的条件约束相一致。

在service.go文件中还分别定义了两个可导出的结构体Args和Reply,分别作为服务方法的第二个、第三个参数的类型。这两个参数类型可自定义或是内建类型,第二个参数也就是这里的Args是输入参数(接收),第三个参数也即Reply是输出参数。

对于方法UserInfo(),在实际中应该读取数据库或缓存,在这里不是讨论的重点,故直接赋值。有兴趣的读者可以进行拓展,可在model目录中来处理数据库的访问与处理。

在model目录中的文件member.go定义了用户结构体:

  1. type User struct {
  2. UID int64 `json:"id"`
  3. AddTime int64 `json:"addtime"`
  4. UserType int32 `json:"utype"`
  5. Uface string `json:"uface"`
  6. UserName string `json:"uname"`
  7. }

接下来,通过服务端程序注册该服务以及方法,server.go文件在cmd目录下server目录中,主要代码如下:

  1. var (
  2. addr = flag.String("addr", "localhost:8972", "server address")
  3. )
  4. func main() {
  5. flag.Parse()
  6. s := server.NewServer()
  7. //s.Register(new(service.ServiceUser), "")
  8. s.RegisterName("ServiceUser", new(service.ServiceUser), "")
  9. err := s.Serve("tcp", *addr)
  10. if err != nil {
  11. panic(err)
  12. }
  13. }

首先使用 NewServer() 来创建一个服务实例,再通过RegisterName()或者Register()方法注册用户服务,方便客户端从服务注册中心查找并调用用户服务,然后调用 Serve 或者 ServeHTTP 来监听客户端的请求。

在rpcx框架中定义了一个非常重要和关键的结构体Server:

  1. type Server struct {
  2. ln net.Listener
  3. readTimeout time.Duration
  4. writeTimeout time.Duration
  5. gatewayHTTPServer *http.Server
  6. DisableHTTPGateway bool // 禁用http调用
  7. DisableJSONRPC bool // 禁用json rpc
  8. serviceMapMu sync.RWMutex
  9. serviceMap map[string]*service
  10. mu sync.RWMutex
  11. activeConn map[net.Conn]struct{}
  12. doneChan chan struct{}
  13. seq uint64
  14. inShutdown int32
  15. onShutdown []func(s *Server)
  16. tlsConfig *tls.Config
  17. options map[string]interface{}
  18. // CORS 选项
  19. corsOptions *CORSOptions
  20. // 所有的插件
  21. Plugins PluginContainer
  22. // AuthFunc 用来鉴权
  23. AuthFunc func(ctx context.Context, req *protocol.Message, token string) error
  24. handlerMsgNum int32
  25. }

rpcx 启动选项

在rpcx 框架中,func NewServer(options …OptionFn) *Server方法先实例化一个Server,然后再设置启动选项,一共提供了 3个 OptionFn 来设置启动选项:

  1. func WithReadTimeout(readTimeout time.Duration) OptionFn
  2. func WithTLSConfig(cfg *tls.Config) OptionFn
  3. func WithWriteTimeout(writeTimeout time.Duration) OptionFn

可以分别用来设置服务读超时、tls证书和写超时,也即设置结构体Server的readTimeout,tlsConfig,writeTimeout 这三个字段的值。当然这三个启动选项是可选的,可根据实际需要来决定。

OptionFn 的定义如下:

  1. type OptionFn func(*Server)

是不是感觉很眼熟!没错,这里采用的就是功能选项设计模式,利用功能选项函数很方便地修改Server实例的字段,也可以做为函数NewServer()的参数来设定启动项的值。

服务注册(RegisterName()或者Register())会通过反射机制,生成service结构体的实例,该结构体的字段中name为服务注册时的具体服务名,如没指定服务名则默认为该服务(本例中为service.ServiceUser)的类型名。

  1. type service struct {
  2. name string // 服务名字
  3. rcvr reflect.Value // 服务方法的接收器
  4. typ reflect.Type // 接收器的类型
  5. method map[string]*methodType // 注册的方法
  6. function map[string]*functionType // 注册的函数
  7. }

最终所有注册的服务会生成serviceMap,也即在结构体Server中定义的字段Go serviceMap map[string]*service

有关服务实例的生成和服务注册过程大致就这样。接下来完成客户端的实现,client.go文件在cmd目录下client目录中,主要代码如下:

  1. var (
  2. addr = flag.String("addr", "localhost:8972", "server address")
  3. )
  4. func main() {
  5. flag.Parse()
  6. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  7. xclient := client.NewXClient("ServiceUser", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  8. defer xclient.Close()
  9. args := service.Args{
  10. Uid: 999,
  11. }
  12. reply := &service.Reply{}
  13. err := xclient.Call(context.Background(), "UserInfo", args, reply)
  14. if err != nil {
  15. log.Fatalf("failed to call: %v", err)
  16. }
  17. log.Println(args.Uid, ":", reply.User)
  18. }

首先使用 NewPeer2PeerDiscovery() 来初始化点对点的服务发现(客户端直连每个服务节点),所谓的服务发现简单点说就是找到服务器列表。上面NewPeer2PeerDiscovery() 中的参数值tcp@ipaddress:port表示通过TCP通信。

在rpcx框架中可以通过TCP(tcp@ipaddress:port)、HTTP(http@ipaddress:port)、UnixDomain(unix@ipaddress:port)、QUIC(quic@ipaddress:port)和KCP(kcp@ipaddress:port)通信,而且http客户端可以通过网关或者http调用来访问rpcx服务。

在rpcx中使用network @ Host: port格式表示服务地址,network 可以为 tcp , http ,unix ,quic或kcp,而Host可以是主机名或是IP地址。

接下来通过NewXClient()函数得到客户端的实例,这个客户端实例支持服务发现与服务治理,其结构体xClient如下:

  1. type xClient struct {
  2. failMode FailMode
  3. selectMode SelectMode
  4. cachedClient map[string]RPCClient
  5. breakers sync.Map
  6. servicePath string
  7. option Option
  8. mu sync.RWMutex
  9. servers map[string]string
  10. discovery ServiceDiscovery
  11. selector Selector
  12. isShutdown bool
  13. auth string
  14. Plugins PluginContainer
  15. ch chan []*KVPair
  16. serverMessageChan chan<- *protocol.Message
  17. }

再看 Go func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient 函数的签名,servicePath 是前面服务端定义的服务名“ServiceUser”,在结构体xClient中对应字段为servicePath ,在客户端和服务端中,这两者对应的字符串需要一致才能正常调用。

上面用户服务的客户端在初始化xClient时选择使用 client.Failtry 错误模式,即调用远程方法失败后再次尝试当前服务器,客户端通过随机选择client.RandomSelect的方式来选择服务器,而服务发现在这里则使用了点对点的方式,也就是直接连接到服务器,可选项为client.DefaultOption,其中默认重试次数为3,默认的编码为MsgPack。大致如图1所示:

rpcx-1.png

图1 xClient初始化

客户端在初始化时会根据参数(F.S.D.O,姑且这样称呼)来确定调用失败后处理模式、路由选择的模式、发现服务器列表以及可选配置项。

FailMode和SelectMode为服务治理 (失败模式与路由选择)的选项定义。在大规模的RPC系统中,许多服务节点在提供同一个服务。客户端如何选择最合适的节点来调用呢?如果调用失败,客户端应该选择另一个节点或者立即返回错误?可以通过NewXClient()来指定具体的模式。

失败处理模式FailMode仅对同步调用有效(xClient.Call),而异步调用(xClient.Go)无效,FailMode一共有下面几种值可选择:

  1. type FailMode int
  2. const (
  3. // 自动选择另一台服务器
  4. Failover FailMode = iota
  5. // 立即返回错误
  6. Failfast
  7. // 再次使用当前客户端
  8. Failtry
  9. // 如果第一台服务器在指定时间内没有快速响应,则选择另一台服务器
  10. Failbackup
  11. )

Failfast模式:一旦调用服务节点失败,rpcx会立即返回错误。 注意这个错误可能是网络错误或者服务异常原因造成的。

Failover模式:rpcx如果遇到错误,它会尝试调用另外一个节点, 直到有服务节点能正常返回信息,或者达到最大的重试次数。 重试次数Retries在参数Option中设置, 缺省设置为3。

Failtry模式:rpcx调用一个服务节点出现错误,继续重试这个节点直到节点正常返回数据或者达到最大重试次数。

Failbackup模式: 如果服务节点在一定的时间内不返回结果, rpcx客户端会发送相同的请求到另外一个节点,只要在这两个节点中任一节点有返回,rpcx就算调用成功。

而路由选择模式SelectMode则有下面几种情况可选择:

  1. // SelectMode 定义从候选者中选择服务的算法
  2. type SelectMode int
  3. const (
  4. // 随机选择
  5. RandomSelect SelectMode = iota
  6. // 轮询模式
  7. RoundRobin
  8. // 加权轮询模式
  9. WeightedRoundRobin
  10. // 加权网络质量优先
  11. WeightedICMP
  12. // 一致性Hash
  13. ConsistentHash
  14. // 最近的服务器
  15. Closest
  16. // 通过用户进行选择
  17. SelectByUser = 1000
  18. )

注意,这里的路由是针对 ServicePath 和 ServiceMethod的路由。

随机模式:从服务节点中随机选择一个节点。由于节点是随机选择,所以并不能保证节点之间负载的均匀。

轮询模式:从服务节点列表中逐个选择依次使用,能保证每个节点均匀被访问,在节点服务能力相差不大时适用。

加权轮询模式:使用基于权重的轮询算法。

网络质量优先:客户端会基于ping(ICMP) 探测各个节点的网络质量,网络质量越好则节点的权重也就越高。

一致性哈希:使用 JumpConsistentHash 选择节点, 相同的servicePath, serviceMethod 和参数会路由到同一个节点上。 JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由需要重新计算一致性哈希。

地理位置优先:它要求服务在注册的时候要设置它所在的地理经纬度。

在rpcx框架中,根据路由选择模式(SelectMode)并通过选择器(Selector)来确定具体的服务器。选择器是一个接口,其定义如下:

  1. type Selector interface {
  2. Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string
  3. UpdateServer(servers map[string]string)
  4. }

上述路由选择模式都已经实现选择器的接口。而且在rpcx框架中支持自定义选择器,如果上述路由选择模式不适合,可考虑实现自己的路由选择器。

另外,客户端的可选配置项结构如下:

  1. type Option struct {
  2. Group string
  3. Retries int
  4. TLSConfig *tls.Config
  5. Block interface{}
  6. RPCPath string
  7. ConnectTimeout time.Duration
  8. ReadTimeout time.Duration
  9. WriteTimeout time.Duration
  10. BackupLatency time.Duration
  11. GenBreaker func() Breaker
  12. SerializeType protocol.SerializeType
  13. CompressType protocol.CompressType
  14. Heartbeat bool
  15. HeartbeatInterval time.Duration
  16. }

在rpcx中,已经预设了一些可选项的值:

  1. var DefaultOption = Option{
  2. Retries: 3,
  3. RPCPath: share.DefaultRPCPath,
  4. ConnectTimeout: 10 * time.Second,
  5. SerializeType: protocol.MsgPack,
  6. CompressType: protocol.None,
  7. BackupLatency: 10 * time.Millisecond,
  8. }

Retries :重试次数。ConnectTimeout:连接超时SerializeType:默认通信协议

还可以设置自动的心跳来保持连接不断掉。客户端需要启用心跳选项,并且设置心跳间隔:

  1. option := client.DefaultOption
  2. option.Heartbeat = true
  3. option.HeartbeatInterval = time.Second

Call()方法是客户端同步远程调用的方法,而另外的Go()方法则是异步远程调用的方法。在这里Call()方法指定调用的RPC方法为用户服务的“UserInfo”方法。当执行Call()方法时,会根据选择器确定的算法(这里是随机)来选择通过服务发现找到的服务器列表,最终确定访问的服务器,远程调用时如果失败则根据失败模式来确定下一步动作,比如上面示例的代码选择Failtry失败模式会重试三次,消息的编码采用MsgPack。当然可以通过设置Option来确定采用其他的编码方式。

用户服务的客户端通过rpcx框架,使用RPC远程调用的方式来调用服务端的方法,现在分别运行服务端和客户端。

在命令行运行服务端程序:

  1. >go run server.go
  2. 2019/07/26 20:50:22 server.go:174: INFO : server pid:724

然后在命令行运行客户端程序:

  1. >go run client.go
  2. 2019/07/26 20:50:41 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

在客户端运行后,服务端接收到客户端请求并响应,控制台会显示:

  1. service: 999
  2. 2019/07/26 20:50:41 server.go:358: INFO : client has closed this connection: 127.0.0.1:60186

当服务端停止服务后,再运行客户端程序,客户端发现调用远程方法失败,接下来会因为client.Failtry模式而重试,而可选项默认的配置是client.DefaultOption.Retries=3,表示重试的次数为三次,所以这里会重试三次而宣告失败。具体如下图2所示:

rpcx-2.png

图2 服务端无响应Failtry

上图中第一次的失败是Call()方法调用失败时的信息。如果把失败模式改为Failfast,停止服务端运行,再运行客户端程序,则调用远程方法时程序会直接报错而不会去尝试重试,具体如下图3所示:

rpcx-3.png

图3 服务端无响应Failfast模式

在上面用户服务中,服务注册是针对服务方法而言的,如s.RegisterName(“ServiceUser”, new(service.ServiceUser), “”),就是将service.ServiceUser用户服务这个结构体的所有方法注册到服务中心。rpcx 也支持将纯函数注册为服务,函数必须满足的条件和前面用户服务中对方法的要求一样:

  • 该函数是可导出的(首字母大写)
  • 该该函数必须有两个可导出或是内建类型的参数
  • 第一个参数为context.Context,第二个参数是输入参数用来接收数据,第三个参数作为输出参数且必须是指针类型
  • 函数返回类型为error

接下来在用户服务的service.go文件中增加一个函数,该函数要按照上面要求定义,否则不能注册成功:

  1. func UserReply(ctx context.Context, args *Args, reply *Reply) error {
  2. reply.User.AddTime = 10000999
  3. reply.User.Uface = "http://image.xxxx.xxx/reply.gif"
  4. reply.User.UID = int64(args.Uid)
  5. reply.User.UserName = "Reply"
  6. reply.User.UserType = 3
  7. return nil
  8. }

在服务端,即server.go文件中增加关键的一行,注册该函数到服务中心:

  1. s.RegisterFunction("ServiceUserFn", service.UserReply, "")

上面方法的第一个参数为该函数的自定义服务名,第二个参数为函数名。

接下来在客户端远程调用这个函数,在clientfn.go中主要代码如下:

  1. func main() {
  2. flag.Parse()
  3. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  4. xclient := client.NewXClient("ServiceUserFn", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  5. defer xclient.Close()
  6. args := service.Args{
  7. Uid: 888,
  8. }
  9. reply := &service.Reply{}
  10. err := xclient.Call(context.Background(), "UserReply", args, reply)
  11. if err != nil {
  12. log.Fatalf("failed to call: %v", err)
  13. }
  14. log.Println(args.Uid, ":", reply.User)
  15. }

函数的远程调用和服务方法的远程调用基本一样,接下来在命令行运行服务端程序:

  1. >go run server.go
  2. 2019/07/28 21:10:36 server.go:174: INFO : server pid:739

然后在命令行运行客户端程序:

  1. >go run clientfn.go
  2. 2019/07/28 21:11:12 888 : {888 10000999 3 http://image.xxxx.xxx/reply.gif Reply}

在客户端运行后,服务端接收到客户端请求并响应,控制台会显示:

  1. 2019/07/28 21:11:12 server.go:358: INFO : client has closed this connection: 127.0.0.1:60209

通过上面用户服务的例子可以看到,rpcx框架使用非常方便,RPC调用过程整体透明,而服务发现以及治理上只需要简单做好配置即可。这些方面对开发者而言,实在是非常的贴心。

当然,rpcx框架不止上面这些特征,还有其他非常值得了解的特性,下面继续来更深入了解和熟悉这款优秀的RPC框架。

服务注册中心

在rpcx框架中,服务注册中心用来实现服务发现和服务元数据的存储。在rpcx框架中支持多种服务注册中心, 并且支持进程内的注册中心,方便开发与测试。rpcx框架会自动将服务的服务名,监听地址,监听协议,权重等信息登记到注册中心,也会定时将服务的吞吐率更新到注册中心。

如果服务意外中断或者宕机,服务注册中心能够监测到事件发生,服务注册中心会通知客户端该服务当前不可用,在服务调用的时候不要再选择这个服务器。

客户端初始化的时候从服务注册中心得到服务器列表,然后根据不同的负载均衡模式选择合适的服务器进行服务调用,同时注册中心会通知客户端某个服务暂时不可用。

服务注册中心与客户端和服务端之间的关系可见下图4:

rpcx-4.png

图4 服务注册中心

在rpcx框架中有几种不同的服务注册中心:

一、点对点

点对点使用 NewPeer2PeerDiscovery() 来初始化服务发现,由客户端直连服务节点,客户端根据唯一服务器的地址直接连接到服务器,事实上它并没有注册中心。而由于只有一个服务节点,函数func NewXClient()在生成xClient实例时,选择器Selector的selectMode实际上并没有什么作用,因为只有一个节点什么规则最终都只会而且只能选择这个节点。

上面的用户服务中,使用的就是点对点的服务注册中心,最简单直接的方式:

  1. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")

二、点对多

点对多顾名思义同一服务部署在多台服务器,可以使用NewMultipleServersDiscovery()来发现部署服务的多台服务器。

为了测试这种服务注册,在前面用户服务的基础上建立新的服务,在appservice目录下建立membermultiple目录,暂且称为membermultiple服务。该服务业务逻辑和用户服务一样,只有服务发现有变化。

假设有两台服务器来部署这个服务,为了方便测试,这里需要通过不同的端口来模拟不同的服务器,服务端文件server.go主要代码为:

  1. var (
  2. addr0 = flag.String("addr0", "localhost:8972", "server0 address")
  3. addr1 = flag.String("addr1", "localhost:8973", "server1 address")
  4. )
  5. func main() {
  6. flag.Parse()
  7. go createServer(*addr0)
  8. go createServer(*addr1)
  9. select {}
  10. }
  11. func createServer(addr string) {
  12. s := server.NewServer()
  13. s.RegisterName("ServiceUser", new(service.ServiceUser), "")
  14. err := s.Serve("tcp", addr)
  15. if err != nil {
  16. panic(err)
  17. }
  18. }

上面代码相当于membermultiple服务同时在两台服务器上运行,而客户端采用NewMultipleServersDiscovery()方式来得到服务器信息,这里客户端采用编码的方式来配置服务器地址。

  1. var (
  2. addr = flag.String("addr0", "tcp@localhost:8972", "server0 address")
  3. addr1 = flag.String("addr1", "tcp@localhost:8973", "server1 address")
  4. )
  5. ......
  6. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr}, {Key: *addr1}})
  7. xclient := client.NewXClient("ServiceUser", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  8. defer xclient.Close()
  9. ......
  10. err := xclient.Call(context.Background(), "UserInfo", args, reply)

为了更好地观察这种方式的运行,这里稍微修改下rpcx框架中的xclient.go文件,在方法selectClient()中添加 fmt.Println(“===”, k)语句,这里K的值是访问服务器的协议以及地址,以便观察选择器最终选择服务器的结果。

接下来运行服务端程序:

  1. >go run server.go
  2. 2019/07/30 20:44:23 server.go:174: INFO : server pid:11444
  3. 2019/07/30 20:44:23 server.go:174: INFO : server pid:11444

模拟的两个服务端已经正常运行,下面运行客户端:

  1. >go run client.go
  2. === tcp@localhost:8972
  3. 2019/07/30 20:46:07 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

再次运行客户端:

  1. >go run client.go
  2. === tcp@localhost:8973
  3. 2019/07/30 20:46:41 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

可以看到两次运行客户端时,远程调用的结果一样,但访问的服务器不一样。经过多次测试发现,现在RandomSelect模式下服务器连接是随机的,并不是轮换,上面测试结果两次服务器不一样只是一种巧合。

现在修改客户端代码中的NewMultipleServersDiscovery()和NewXClient()的参数值为如下所示:

  1. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr, Value: "weight=1"}, {Key: *addr1, Value: "weight=9"}})
  2. xclient := client.NewXClient("ServiceUser", client.Failtry, client.WeightedRoundRobin, d, client.DefaultOption)

改为WeightedRoundRobin按照权重轮询后,再多次运行客户端会发现,访问的服务器大多数情况都是addr1,因为它的权重是9,所以基本上都会连接到这台服务器上。

三、Etcd

说到Etcd,它是一个强一致的分布式键值存储存储系统,主要用于配置和服务发现,用它来做rpcx框架的服务注册中心是非常合适的选择。下面来具体了解怎样利用Etcd做服务注册中心。

首先需要确定已经安装好Etcd,如果没有则请先安装Etcd。

为了测试Etcd服务注册,在前面用户服务的基础上建立新的服务,在appservice目录下建立memberetcd目录,暂且称为memberetcd服务。该服务业务逻辑和用户服务一样,服务注册在Etcd上面。

下面开始搭建memberetcd服务,服务端文件server.go主要代码为:

  1. var (
  2. addr = flag.String("addr", "localhost:8972", "server address")
  3. etcdAddr = flag.String("etcdAddr", "localhost:2379", "etcd address")
  4. basePath = flag.String("base", "/rpcx_test", "prefix path")
  5. )
  6. func main() {
  7. flag.Parse()
  8. s := server.NewServer()
  9. addRegistryPlugin(s)
  10. //s.Register(new(service.ServiceUser), "")
  11. s.RegisterName("ServiceUser", new(service.ServiceUser), "")
  12. err := s.Serve("tcp", *addr)
  13. if err != nil {
  14. panic(err)
  15. }
  16. }
  17. func addRegistryPlugin(s *server.Server) {
  18. r := &serverplugin.EtcdRegisterPlugin{
  19. ServiceAddress: "tcp@" + *addr,
  20. EtcdServers: []string{*etcdAddr},
  21. BasePath: *basePath,
  22. Metrics: metrics.NewRegistry(),
  23. UpdateInterval: time.Minute,
  24. }
  25. err := r.Start()
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. s.Plugins.Add(r)
  30. }

这里主要实现了把服务注册到Etcd,主要通过addRegistryPlugin()函数来实现。接下来实现客户端的主要代码如下:

  1. var (
  2. etcdAddr = flag.String("etcdAddr", "localhost:2379", "etcd address")
  3. basePath = flag.String("base", "/rpcx_test", "prefix path")
  4. )
  5. ......
  6. d := client.NewEtcdDiscovery(*basePath, "ServiceUser", []string{*etcdAddr}, nil)
  7. xclient := client.NewXClient("ServiceUser", client.Failover, client.RoundRobin, d, client.DefaultOption)
  8. defer xclient.Close()
  9. ......

这里客户端采用NewEtcdDiscovery()方式发现服务。前面在安装rpcx时建议加上标签:-tags etcd,在这里也需要用到这个编译标签。在rpcx的etcd_discovery.go文件中带有编译标签:// +build etcd ,所以使用在运行或者编译时需要注意用上这个标签。

首先启动Etcd服务,接下来运行服务端程序:

  1. >go run -tags etcd server.go
  2. 2019/08/05 21:22:38 server.go:174: INFO : server pid:11444
  3. 2019/08/05 21:22:38 server.go:174: INFO : server pid:11444

模拟服务端已经正常运行,下面运行客户端:

  1. >go run -tags etcd client.go
  2. === tcp@localhost:8972
  3. 2019/08/05 21:25:16 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

此时停掉Etcd服务,再次运行客户端,则服务端和客户端都会发生错误导致程序不能正常运行。

而再次启动Etcd服务,此时再运行客户端可以得到正常结果:

  1. >go run -tags etcd client.go
  2. === tcp@localhost:8972
  3. 2019/08/05 21:32:06 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

通过Etcd客户端可以看到注册的服务信息:

/rpcx_test/ServiceUser/tcp@localhost:8972

Etcd作为服务注册中心是可靠的,类似像ZooKeeper、Consul都可以作为可靠的服务注册中心,由于rpcx框架已经封装好了其作为服务注册中心的使用方法,因此Etcd和它们在使用上相差无几,这里就不再列举例子说明。 需要注意的是使用run命令运行或者构建应用程序时需要带上编译标签,如上面例子中 -tags etcd。

rpcx调用

在rpcx框架中,调用有下面几种方式:

  1. func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
  2. func (c *xClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{},done chan *Call) (*Call, error)
  3. func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
  4. func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error

Go()方法是异步调用,在异步调用中,失败模式FailMode将会不起作用,它即时返回一个Call结构体实例。

Call()方法是同步调用,也是最常用的调用方式,它会根据选择器确定服务器,支持失败模式FailMode,可以设置Option可选项,来进行远程调用,直到服务器返回数据或者超时。

Broadcast()方法将请求发送到该服务的所有节点。如果所有的节点都正常返回才算成功。只有在所有节点没有错误的情况下, Broadcast()方法将返回其中的一个节点的返回信息。 如果有节点返回错误的话,Broadcast()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效,最好设置超时避免程序挂起。

Fork()方法将请求发送到该服务的所有节点。如果有任何一个节点正常返回,则成功,Fork()方法将返回其中的一个节点的返回结果。 如果所有节点返回错误的话,Fork()方法将返回这些错误信息中的一个。失败模式FailMode和路由选择SelectMode在该方法中都不会生效。

还是在用户服务的基础上来看看Fork()方法的实际运行情况,在用户服务目录下cmd目录中新建fork目录,作为Fork()方法的测试目录。

服务端模拟两个服务器:

  1. var (
  2. addr0 = flag.String("addr0", "localhost:8972", "server0 address")
  3. addr1 = flag.String("addr1", "localhost:8973", "server1 address")
  4. )

客户端使用多点服务发现,再使用Fork()方法:

  1. var (
  2. addr = flag.String("addr0", "tcp@localhost:8972", "server0 address")
  3. addr1 = flag.String("addr1", "tcp@localhost:8973", "server1 address")
  4. )
  5. func main() {
  6. flag.Parse()
  7. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr, Value: "weight=1"}, {Key: *addr1, Value: "weight=9"}})
  8. xclient := client.NewXClient("ServiceUser", client.Failtry, client.WeightedRoundRobin, d, client.DefaultOption)
  9. defer xclient.Close()
  10. args := service.Args{
  11. Uid: 999,
  12. }
  13. reply := &service.Reply{}
  14. err := xclient.Fork(context.Background(), "UserInfo", args, reply)
  15. if err != nil {
  16. log.Fatalf("failed to call: %v", err)
  17. }
  18. log.Println(args.Uid, ":", reply.User)
  19. }

先在命令行运行服务端:

  1. >go run server.go
  2. 2019/08/17 15:47:36 server.go:174: INFO : server pid:9956
  3. 2019/08/17 15:47:36 server.go:174: INFO : server pid:9956

然后运行客户端:

  1. >go run client.go
  2. 2019/08/17 15:47:50 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

而此时服务端控制台显示:

  1. service: 999
  2. service: 999
  3. 2019/08/17 15:47:50 server.go:358: INFO : client has closed this connection: 127.0.0.1:58487
  4. 2019/08/17 15:47:50 server.go:358: INFO : client has closed this connection: 127.0.0.1:58489

表明Fork()方法将请求发送到了这两个服务器,得到正常结果后返回。而当服务器全部停止服务时,则Fork()方法直接报ErrXClientNoServer错误信息。

对于RPC来说,序列化对于远程调用的响应速度、吞吐量、网络带宽消耗等也起着至关重要的作用,是分布式系统性能提升的关键因素之一。在rpcx框架中,默认使用 msgpack 编解码器,一共有下面几种编解码器:

SerializeNone:不会对数据进行编解码,要求数据为 []byte 类型。JSON:通用的数据交换的格式,常规情况下可使用这种编解码。protocol buffers:一种高性能的编解码器。MsgPack:默认的编解码器,一种高性能的编解码器,是跨语言的编解码器。Thrift:一种高性能的编解码器。

开发中可以设置Option.SerializeType来指定合适的编解码器。对于有特殊要求的场景,还可以定制新的编解码器。

编解码也即序列化/反序列化,在rpcx中需要将消息结构体序列化为二进制数据,同时也需要将网络流数据反序列化为内部使用的消息结构体。大致如下图5所示:

rpcx-5.png

图5 rpcx编解码

由于在gRPC中只能使用ProtoBuf,因此看看在rpcx中怎样来使用ProtoBuf,下面基于前面的用户服务来实现,新建服务memberproto。

由于有ProtoBuf,建立pb目录来存放member.proto文件:

  1. syntax = "proto3";
  2. package pb;
  3. message Args {
  4. int64 Id = 1;
  5. }
  6. message Reply {
  7. int64 UID =1;
  8. int64 AddTime=2;
  9. int32 UserType=3;
  10. string Uface =4;
  11. string UserName=5;
  12. }
  13. message ProtoArgs {
  14. int32 A = 1;
  15. int32 B = 2;
  16. }
  17. message ProtoReply {
  18. int32 C = 1;
  19. }

运行命令:

  1. protoc --go_out=. member.proto

得到member.pb.go文件,接下来修改service.g文件代码,这次增加了一个方法Mul():

  1. func (s *ServiceUser) UserInfo(ctx context.Context, args *pb.Args, reply *pb.Reply) error {
  2. fmt.Println("service:", args.Id)
  3. reply.AddTime = 14990093
  4. reply.Uface = "http://image.xxxx.xxx/t.gif"
  5. reply.UID = int64(args.Id)
  6. reply.UserName = "Joke"
  7. reply.UserType = 2
  8. return nil
  9. }
  10. func (t *ServiceUser) Mul(ctx context.Context, args *pb.ProtoArgs, reply *pb.ProtoReply) error {
  11. reply.C = args.A * args.B
  12. fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
  13. return nil
  14. }

客户端需要修改Option可选项,把默认的编解码器改为protocol.ProtoBuffer:

  1. func main() {
  2. flag.Parse()
  3. // register customized codec
  4. option := client.DefaultOption
  5. option.SerializeType = protocol.ProtoBuffer
  6. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  7. xclient := client.NewXClient("ServiceUser", client.Failover, client.RandomSelect, d, client.DefaultOption)
  8. defer xclient.Close()
  9. args1 := &pb.ProtoArgs{
  10. A: 10,
  11. B: 20,
  12. }
  13. reply1 := &pb.ProtoReply{}
  14. err := xclient.Call(context.Background(), "Mul", args1, reply1)
  15. args := &pb.Args{
  16. Id: 999,
  17. }
  18. reply := &pb.Reply{}
  19. err = xclient.Call(context.Background(), "UserInfo", args, reply)
  20. if err != nil {
  21. log.Fatalf("failed to call: %v", err)
  22. }
  23. log.Printf("%d * %d = %d", args1.A, args1.B, reply1.C)
  24. log.Println(args.Id, ":", reply)
  25. }

接下来运行服务端程序:

  1. >go run server.go
  2. 2019/08/17 23:05:22 server.go:174: INFO : server pid:7124

然后运行客户端:

  1. >go run client.go
  2. 2019/08/17 23:05:26 10 * 20 = 200
  3. 2019/08/17 23:05:26 999 : UID:999 AddTime:14990093 UserType:2 Uface:"http://image.xxxx.xxx/t.gif" UserName:"Joke"

很明显新设置的protocol.ProtoBuffer编解码器生效了。由于ProtoBuf使用上更加麻烦,而且和MsgPack相比反倒是MsgPack更有优势,所以在rpcx中默认使用MsgPack也就有了很好的理由。

在rpcx中还可以定制新的编解码器,下面以gob作为新的编解码器,新建服务membergob来实验一下。

首先修改service.go文件,在里面加入gob编解码两个方法:

  1. type GobCodec struct {
  2. }
  3. func (c *GobCodec) Decode(data []byte, i interface{}) error {
  4. enc := gob.NewDecoder(bytes.NewBuffer(data))
  5. err := enc.Decode(i)
  6. return err
  7. }
  8. func (c *GobCodec) Encode(i interface{}) ([]byte, error) {
  9. var buf bytes.Buffer
  10. enc := gob.NewEncoder(&buf)
  11. err := enc.Encode(i)
  12. return buf.Bytes(), err
  13. }

在服务端server.go文件中增加新的gob编解码器:

  1. share.Codecs[protocol.SerializeType(5)] = &service.GobCodec{}

在客户端client.go文件中增加新的gob编解码器,同时修改Option选项中的SerializeType为新增的gob编解码器:

  1. share.Codecs[protocol.SerializeType(5)] = &service.GobCodec{}
  2. option := client.DefaultOption
  3. option.SerializeType = protocol.SerializeType(5)

现在可以运行服务端程序:

  1. >go run server.go
  2. 2019/08/17 21:42:57 server.go:174: INFO : server pid:2588

然后运行客户端:

  1. >go run client.go
  2. 2019/08/17 21:44:48 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

而此时服务端控制台显示:

  1. service: 999
  2. 2019/08/17 21:47:50 server.go:358: INFO : client has closed this connection: 127.0.0.1:62004

通过抓包可以看到端口62004与8972之间存在通信,如下图6所示:

rpcx-6.png

图6 gob编解码

如果需要新增其他的编解码器,只需要先实现编解码器,主要实现了Encode和Decode方法,就相当于有了新的编解码器,把新的编解码加入到rpcx中就非常容易了。所以,有兴趣的话读者可以自己尝试一下。

超时设置

随着对rpcx框架有了更多的了解,现在开始对框架进行更深一些入细致的了解,更多的细节弄清楚有助于我们更全面了解rpcx框架。比如在客户端和服务端,可以设置超时。

超时机制可认为是一种保护机制,避免服务陷入无限的等待之中。在给定的时间没有响应则服务调用就进入下一个状态,或者重试、或者立即返回错误。

在服务端,主要通过OptionFn来设置两种超时,分别是读超时readTimeout和写超时writeTimeout:

  1. func WithReadTimeout(readTimeout time.Duration) OptionFn
  2. func WithWriteTimeout(writeTimeout time.Duration) OptionFn

既可以在实例化服务时使用NewServer(options …OptionFn),也可以直接使用WithReadTimeout()等函数来直接设置。

在客户端可在Option中设置几个超时值:

  1. type Option struct {
  2. ……
  3. // 连接超时
  4. ConnectTimeout time.Duration
  5. // 读超时
  6. ReadTimeout time.Duration
  7. // 写超时
  8. WriteTimeout time.Duration
  9. ……
  10. }

在DefaultOption 中设置了连接超时值为 10 秒,但并没有设置 ReadTimeout 和 WriteTimeout。没有设置的超时字段,可以根据情况来设置,但一般默认就可以了。

在客户端中,使用context.Context也可以来控制超时,如使用context.WithTimeout 来设置超时时间,这是在客户端推荐的一种设置超时方式。

  1. func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

元数据

在上面的远程RPC调用中,远程调用的方法可以通过参数来传递数据。在rpcx中,客户端和服务端可以互相传递元数据。元数据指的也具体业务无关的基础数据。在rpcx中元数据是一个键值队的列表,键和值都是字符串。

在服务器端注册服务的方法中有个设置元数据的参数metadata:

  1. func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error

参数metadata一般为空字符串,但可以为服务增加一些元数据。比如服务状态(state)就是其中一类元数据,如果在元数据中设置了state=inactive,客户端将不能访问这些服务。这可以帮忙程序员临时禁用一些服务,大致实现服务的降级。

下面实际来看看具体的例子,基于前面的用户服务来实现,新建服务membermeta,在cmd目录中建立state目录,该目录下分别建立server和client目录。大致结构如下:

  1. membermeta
  2. ├─cmd
  3. └─state
  4. ├─client
  5. client.go
  6. └─server
  7. server.go
  8. ├─model
  9. member.go
  10. └─service
  11. service.go

修改server.go文件,在RegisterName()方法中设置元数据state=inactive:

  1. s.RegisterName("ServiceUser", new(service.ServiceUser), "state=inactive")

修改client.go文件中代码:

  1. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr, Value: "state=inactive"}, {Key: *addr1, Value: "state=inactive"}})

运行服务端程序:

  1. >go run server.go
  2. 2019/08/18 22:18:01 server.go:174: INFO : server pid:17696
  3. 2019/08/18 22:18:01 server.go:174: INFO : server pid:17696

然后运行客户端:

  1. >go run client.go
  2. 2019/08/18 22:18:06 connection.go:91: WARN : failed to dial server: dial tcp: missing address
  3. 2019/08/18 22:18:06 connection.go:91: WARN : failed to dial server: dial tcp: missing address
  4. 2019/08/18 22:18:06 connection.go:91: WARN : failed to dial server: dial tcp: missing address
  5. 2019/08/18 22:18:06 failed to call: can not found any server

可以看到客户都已经不能正常与服务端通信了,说明设置元数据state=inactive生效。这种通过设置元数据来修改服务状态的方法,在客户端如果不设置,即使服务端设置了也不会生效。

例如把客户端分别修改为下面三种情况:

  1. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr}, {Key: *addr1}})
  2. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr}, {Key: *addr1, Value: "state=inactive"}})
  3. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr, Value: "state=inactive"}, {Key: *addr1}})

在服务端不改变情况下,分别运行三次客户端:

  1. >go run client.go
  2. === tcp@localhost:8972
  3. 2019/08/18 22:23:38 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}
  1. >go run client.go
  2. === tcp@localhost:8972
  3. 2019/08/18 22:23:55 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}
  1. >go run client.go
  2. === tcp@localhost:8973
  3. 2019/08/18 22:24:13 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

第三种情况很明显,第一个服务器服务状态为inactive,只有第二个服务器正常,所以由它提供服务。

而甚至如果服务端不设置元数据state=inactive,客户端设置了元数据state=inactive也依然生效。如服务端不设置元数据state=inactive,而客户端设置为:

d := client.NewMultipleServersDiscovery([]client.KVPair{{Key: addr, Value: “state=inactive”}, {Key: *addr1, Value: “state=inactive”}})

则客户端运行结果依然是能正常与服务端通信。虽然服务端可不用设置,但建议还是设置这个参数。但如果客户端不设置这个元数据,则服务端无论怎样设置都不会起作用,这个必须要注意。

和服务状态类似的还有分组(Group)元数据,在初始化客户端实例时,NewXClient()函数中会调用下面的函数filterByStateAndGroup(),这个函数会检查服务状态和分组两个元数据信息,根据情况来把对应服务器从列表中删除:

  1. func filterByStateAndGroup(group string, servers map[string]string) {
  2. for k, v := range servers {
  3. if values, err := url.ParseQuery(v); err == nil {
  4. if state := values.Get("state"); state == "inactive" {
  5. delete(servers, k)
  6. }
  7. if group != "" && group != values.Get("group") {
  8. delete(servers, k)
  9. }
  10. }
  11. }
  12. }

为了测试分组元数据的使用,在上面的服务membermeta的cmd目录中新建group目录,分别建立server目录和client目录。

对server.go做简单修改,仅仅只添加metadata参数:

  1. s.RegisterName("ServiceUser", new(service.ServiceUser), "group=Member")

对client.go文件主要修改如下:

  1. ......
  2. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr, Value: ""}, {Key: *addr1, Value: "group=Member"}})
  3. option := client.DefaultOption
  4. option.Group = "Me"
  5. xclient := client.NewXClient("ServiceUser", client.Failover, client.RoundRobin, d, option)
  6. defer xclient.Close()
  7. ......
  8. `

运行服务端程序:

  1. >go run server.go
  2. 2019/08/19 00:02:06 server.go:174: INFO : server pid:7400
  3. 2019/08/19 00:02:06 server.go:174: INFO : server pid:7400

然后运行客户端:

  1. >go run client.go
  2. 2019/08/19 00:14:06 failed to call: can not found any server

由于在服务端设置的元数据“group=Member”,而在客户端设置的option.Group = “Me”,客户端和服务端不在一组,所以客户端不能访问到服务器。

此时再修改下client.go文件,修改客户端设置的分组为服务端设置的分组值:

  1. option.Group = "Member"

再运行客户端:

  1. >go run client.go
  2. === tcp@localhost:8973
  3. 2019/08/19 00:21:38 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

和预想中基本一样,由于服务端和客户端都设置在同一分组,而且在服务发现中指定8973端口的服务器为分组Member,所以应该只有这台服务器可以访问,而且运行结果也证实了这一点。

如果此时把服务发现的分组元数据指定为其他值:

  1. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr, Value: ""}, {Key: *addr1, Value: "group=009"}}

则客户端运行结果:

  1. >go run client.go
  2. 2019/08/19 00:22:52 failed to call: can not found any server

可以看到,运行结果失败,原因是由于该分组没有服务器,所以调用服务失败。所以如果在服务发现中指定分组值,与服务端也需要保持一致。

和服务状态一样,分组也可以在客户端避开。如果在客户端不设置Group这个可选项,其实分组的限制是不起作用的:

  1. option := client.DefaultOption
  2. option.Group = "Member"

把客户端的这个设置取消掉,改为:

  1. option := client.DefaultOption

然后再运行客户端程序:

  1. go run client.go
  2. === tcp@localhost:8972
  3. 2019/08/19 00:33:08 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}
  4. `

虽然服务发现中指定的元数据和服务端不一样,但是由于option.Group 没有设置,分组的限制没有生效。

网关

在rpcx框架中,可以通过网关(Gateway)的方式来实现跨语言的调用,比如Java、Python、C#、Node.js、Php、C\C++、Rust等来调用 rpcx 服务。如图7所示:

rpcx-7.png

图7 网关

使用网关程序有两种部署模式Gateway和Agent。

1.Gateway:网关模式需要将网关程序独立部署,所有http请求都将先发送给网关,网关将其转换为rpcx请求,再来调用相关rpcx服务,并将rpcx的返回结果转换成http响应, 最终返回给用户。

2.Agent:代理模式是将网关程序和客户端程序一起部署,代理作为一个后台服务部署。客户端发送http请求到本地的代理, 本地的代理将请求转为rpcx请求,并转发到相应的rpcx服务,最后将rpcx的返回结果转换为http响应返回给客户端,类似于Istio中的Sidecar。

下面来实际演示一下网关,在发送http请求时,需要额外设置一些Header信息:

  1. X-RPCX-Version: rpcx 版本
  2. X-RPCX-MesssageType: 设置为0,代表请求
  3. X-RPCX-Heartbeat: 是否是心跳请求, 缺省false
  4. X-RPCX-Oneway: 是否是单向请求, 缺省false.
  5. X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
  6. X-RPCX-MessageID: 消息id, uint64 类型
  7. X-RPCX-ServicePath: service path
  8. X-RPCX-ServiceMethod: service method
  9. X-RPCX-Meta: 额外的元数据

而对于 http 响应,也有对应的Header信息:

  1. X-RPCX-Version: rpcx 版本
  2. X-RPCX-MesssageType: 1 ,代表响应
  3. X-RPCX-Heartbeat: 是否是heartbeat请求
  4. X-RPCX-MessageStatusType: Error 还是正常返回结果
  5. X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
  6. X-RPCX-MessageID: 消息id, uint64 类型
  7. X-RPCX-ServicePath: service path
  8. X-RPCX-ServiceMethod: service method
  9. X-RPCX-Meta: 额外的元数据
  10. X-RPCX-ErrorMessage: 错误信息

在用户服务的基础上来看看网关的实际运行情况,在用户服务目录下cmd目录中新建gateway目录,作为测试目录。

在client目录中增加一个文件httpclient.go,通过这个文件来测试网关连接rpcx服务:

  1. func main() {
  2. cc := &codec.MsgpackCodec{}
  3. args := service.Args{
  4. Uid: 999,
  5. }
  6. data, _ := cc.Encode(args)
  7. req, err := http.NewRequest("POST", "http://127.0.0.1:8972/", bytes.NewReader(data))
  8. if err != nil {
  9. log.Fatal("failed to create request: ", err)
  10. return
  11. }
  12. h := req.Header
  13. h.Set(gateway.XMessageID, "10000")
  14. h.Set(gateway.XMessageType, "0")
  15. h.Set(gateway.XSerializeType, "3")
  16. h.Set(gateway.XServicePath, "ServiceUser")
  17. h.Set(gateway.XServiceMethod, "UserInfo")
  18. res, err := http.DefaultClient.Do(req)
  19. if err != nil {
  20. log.Fatal("failed to call: ", err)
  21. }
  22. defer res.Body.Close()
  23. // handle http response
  24. replyData, err := ioutil.ReadAll(res.Body)
  25. if err != nil {
  26. log.Fatal("failed to read response: ", err)
  27. }
  28. reply := &service.Reply{}
  29. err = cc.Decode(replyData, reply)
  30. if err != nil {
  31. log.Fatal("failed to decode reply: ", err)
  32. }
  33. log.Println(args.Uid, ":", reply.User)
  34. }

运行服务端程序:

  1. >go run server.go
  2. 2019/08/19 20:53:51 server.go:174: INFO : server pid:16136

然后运行含有网关的客户端:

  1. >go run httpclient.go
  2. 2019/08/19 21:02:43 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}

可以看到http响应正常。

断路器(Circuit Breaker)

在rpcx中,可能出现一个节点宕机的情况,可以使用断路器(Circuit Breaker)模式来避免这个错误影响到其他服务,防止出现雪崩情况。

客户端通过断路器调用服务, 一旦连续的错误达到一个阈值,断路器就会断开进行保护,如果继续调用这个节点,系统直接返回错误。经过一段时间,断路器会处于半开的状态,允许一定数量的请求尝试发送到这个节点,如果正常访问,断路器就处于全开的状态,否则又进入短路的状态。

在rpcx 这样定义了断路器 Breaker 接口:

  1. // Breaker is a CircuitBreaker interface.
  2. type Breaker interface {
  3. Call(func() error, time.Duration) error
  4. Fail()
  5. Success()
  6. Ready() bool
  7. }

在rpcx 中只提供了一个简单的断路器 ConsecCircuitBreaker,实现代码保存在circuit_breaker.go文件中,它在连续发生规定数量的故障或超时后跳闸,再经过一段时间后打开。

  1. option := client.DefaultOption
  2. option.GenBreaker = func() client.Breaker { return client.NewConsecCircuitBreaker(5, 30*time.Second) }

还是在用户服务的cmd目录中建立breaker目录来演示断路器在rpcx中的作用。通过在服务端两个端口只启动一个服务的简单模拟故障发生:

  1. var (
  2. addr0 = flag.String("addr0", "localhost:8972", "server0 address")
  3. addr1 = flag.String("addr1", "localhost:8973", "server1 address")
  4. )
  5. func main() {
  6. flag.Parse()
  7. go createServer(*addr0)
  8. //go createServer(*addr1)
  9. select {}
  10. }
  11. func createServer(addr string) {
  12. s := server.NewServer()
  13. s.RegisterName("ServiceUser", new(service.ServiceUser), "")
  14. err := s.Serve("tcp", addr)
  15. if err != nil {
  16. panic(err)
  17. }
  18. }

客户端需要在Option中指定GenBreaker,这里指定rpcx自带的断路器,这个断路器只有一种触发的条件,即连续多次触发断路,这里设定为2次:

  1. var (
  2. addr = flag.String("addr", "localhost:8972", "server address")
  3. addr1 = flag.String("addr1", "localhost:8973", "server1 address")
  4. )
  5. func main() {
  6. flag.Parse()
  7. option := client.DefaultOption
  8. option.GenBreaker = func() client.Breaker { return client.NewConsecCircuitBreaker(2, 30*time.Second) }
  9. d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr}, {Key: *addr1}})
  10. option.Retries = 5
  11. xclient := client.NewXClient("ServiceUser", client.Failtry, client.RandomSelect, d, option)
  12. defer xclient.Close()
  13. args := service.Args{
  14. Uid: 999,
  15. }
  16. for i := 0; i < 100; i++ {
  17. reply := &service.Reply{}
  18. err := xclient.Call(context.Background(), "UserInfo", args, reply)
  19. if err != nil {
  20. log.Printf("failed to call: %v", err)
  21. }
  22. log.Println(args.Uid, ":", reply.User)
  23. time.Sleep(time.Second)
  24. }
  25. }

运行服务端程序:

  1. >go run server.go
  2. 2019/08/20 16:51:30 server.go:174: INFO : server pid:20496

然后运行含有断路器的客户端:

  1. >go run client.go
  2. === localhost:8973
  3. 2019/08/20 18:29:42 connection.go:91: WARN : failed to dial server: dial tcp [::1]:8973: connectex: No connection could be made because the target machine actively refused it.
  4. 2019/08/20 18:29:43 connection.go:91: WARN : failed to dial server: dial tcp [::1]:8973: connectex: No connection could be made because the target machine actively refused it.
  5. 2019/08/20 18:29:43 failed to call: dial tcp [::1]:8973: connectex: No connection could be made because the target machine actively refused it.
  6. 2019/08/20 18:29:43 999 : {0 0 0 }
  7. === localhost:8973
  8. 2019/08/20 18:29:44 failed to call: breaker open
  9. 2019/08/20 18:29:44 999 : {0 0 0 }
  10. === localhost:8973
  11. 2019/08/20 18:29:45 failed to call: breaker open
  12. 2019/08/20 18:29:45 999 : {0 0 0 }
  13. === localhost:8972
  14. 2019/08/20 18:29:46 999 : {999 14990093 2 http://image.xxxx.xxx/t.gif Joke}
  15. === localhost:8973
  16. 2019/08/20 18:29:47 failed to call: breaker open
  17. 2019/08/20 18:29:47 999 : {0 0 0 }

客户端设置的失败模式是Failtry,所以重试两次后触发断路器,在断路器生效期间,再次调用则显示breaker open,表明断路器在有效期,这期间不会继续调用已经出问题的服务, 从而达到保护的目的,整个系统不会出现因为超时而产生的雪崩式连锁反应。

另外有开源包:github.com/rubyist/circuitbreaker,提供更多的断路器:

  1. func NewBreaker() *Breaker // 空断路器
  2. func NewThresholdBreaker(threshold int64) *Breaker // 失败次数
  3. func NewConsecutiveBreaker(threshold int64) *Breaker // 连续失败次数
  4. func NewRateBreaker(rate float64, minSamples int64) *Breaker // 根据失败比率

把客户端代码稍微修改下,改为使用circuitbreaker来做断路器:

  1. option := client.DefaultOption
  2. option.GenBreaker = func() client.Breaker {
  3. return circuit.NewBreakerWithOptions(&circuit.Options{
  4. ShouldTrip: circuit.ThresholdTripFunc(2),
  5. WindowTime: 30 * time.Second,
  6. })
  7. }

运行客户端,也可以实现断路器模式:

  1. >go run client.go
  2. ......
  3. 2019/08/20 19:30:06 failed to call: dial tcp [::1]:8973: connectex: No connection could be made because the target machine actively refused it.
  4. 2019/08/20 19:30:06 999 : {0 0 0 }
  5. === localhost:8973
  6. 2019/08/20 19:30:07 failed to call: breaker open
  7. 2019/08/20 19:30:07 999 : {0 0 0 }