接下来我们就需要给Zinx添加消息队列和多任务Worker机制了。我们可以通过worker的数量来限定处理业务的固定goroutine数量,而不是无限制的开辟Goroutine,虽然我们知道go的调度算法已经做的很极致了,但是大数量的Goroutine依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。我们可以用消息队列来缓冲worker工作的数据。

初步我们的设计结构如下图:

4-Zinx-V0.8多任务消息队列.jpeg

8.1 创建消息队列

首先,处理消息队列的部分,我们应该集成到MsgHandler模块下,因为属于我们消息模块范畴内的

zinx/znet/msghandler.go

  1. type MsgHandle struct {
  2. Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
  3. WorkerPoolSize uint32 //业务工作Worker池的数量
  4. TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
  5. }
  6. func NewMsgHandle() *MsgHandle {
  7. return &MsgHandle{
  8. Apis: make(map[uint32]ziface.IRouter),
  9. WorkerPoolSize:utils.GlobalObject.WorkerPoolSize,
  10. //一个worker对应一个queue
  11. TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
  12. }
  13. }

这里添加两个成员

WokerPoolSize:作为工作池的数量,因为TaskQueue中的每个队列应该是和一个Worker对应的,所以我们在创建TaskQueue中队列数量要和Worker的数量一致。

TaskQueue真是一个Request请求信息的channel集合。用来缓冲提供worker调用的Request请求信息,worker会从对应的队列中获取客户端的请求数据并且处理掉。

当然WorkerPoolSize最好也可以从GlobalObject获取,并且zinx.json配置文件可以手动配置。

zinx/utils/globalobj.go

  1. /*
  2. 存储一切有关Zinx框架的全局参数,供其他模块使用
  3. 一些参数也可以通过 用户根据 zinx.json来配置
  4. */
  5. type GlobalObj struct {
  6. /*
  7. Server
  8. */
  9. TcpServer ziface.IServer //当前Zinx的全局Server对象
  10. Host string //当前服务器主机IP
  11. TcpPort int //当前服务器主机监听端口号
  12. Name string //当前服务器名称
  13. /*
  14. Zinx
  15. */
  16. Version string //当前Zinx版本号
  17. MaxPacketSize uint32 //都需数据包的最大值
  18. MaxConn int //当前服务器主机允许的最大链接个数
  19. WorkerPoolSize uint32 //业务工作Worker池的数量
  20. MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量
  21. /*
  22. config file path
  23. */
  24. ConfFilePath string
  25. }
  26. //...
  27. //...
  28. /*
  29. 提供init方法,默认加载
  30. */
  31. func init() {
  32. //初始化GlobalObject变量,设置一些默认值
  33. GlobalObject = &GlobalObj{
  34. Name: "ZinxServerApp",
  35. Version: "V0.4",
  36. TcpPort: 7777,
  37. Host: "0.0.0.0",
  38. MaxConn: 12000,
  39. MaxPacketSize: 4096,
  40. ConfFilePath: "conf/zinx.json",
  41. WorkerPoolSize: 10,
  42. MaxWorkerTaskLen: 1024,
  43. }
  44. //从配置文件中加载一些用户配置的参数
  45. GlobalObject.Reload()
  46. }

8.2 创建及启动Worker工作池

现在添加Worker工作池,先定义一些启动工作池的接口

zinx/ziface/imsghandler.go

  1. /*
  2. 消息管理抽象层
  3. */
  4. type IMsgHandle interface{
  5. DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
  6. AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
  7. StartWorkerPool() //启动worker工作池
  8. SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
  9. }

zinx/znet/msghandler.go

  1. //启动一个Worker工作流程
  2. func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
  3. fmt.Println("Worker ID = ", workerID, " is started.")
  4. //不断的等待队列中的消息
  5. for {
  6. select {
  7. //有消息则取出队列的Request,并执行绑定的业务方法
  8. case request := <-taskQueue:
  9. mh.DoMsgHandler(request)
  10. }
  11. }
  12. }
  13. //启动worker工作池
  14. func (mh *MsgHandle) StartWorkerPool() {
  15. //遍历需要启动worker的数量,依此启动
  16. for i:= 0; i < int(mh.WorkerPoolSize); i++ {
  17. //一个worker被启动
  18. //给当前worker对应的任务队列开辟空间
  19. mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
  20. //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
  21. go mh.StartOneWorker(i, mh.TaskQueue[i])
  22. }
  23. }

StartWorkerPool()方法是启动Worker工作池,这里根据用户配置好的WorkerPoolSize的数量来启动,然后分别给每个Worker分配一个TaskQueue,然后用一个goroutine来承载一个Worker的工作业务。

StartOneWorker()方法就是一个Worker的工作业务,每个worker是不会退出的(目前没有设定worker的停止工作机制),会永久的从对应的TaskQueue中等待消息,并处理。

8.3 发送消息给消息队列

现在,worker工作池已经准备就绪了,那么就需要有一个给到worker工作池消息的入口,我们再定义一个方法

zinx/ziface/imsghandler.go

  1. //将消息交给TaskQueue,由worker进行处理
  2. func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) {
  3. //根据ConnID来分配当前的连接应该由哪个worker负责处理
  4. //轮询的平均分配法则
  5. //得到需要处理此条连接的workerID
  6. workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
  7. fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)
  8. //将请求消息发送给任务队列
  9. mh.TaskQueue[workerID] <- request
  10. }

SendMsgToTaskQueue()作为工作池的数据入口,这里面采用的是轮询的分配机制,因为不同链接信息都会调用这个入口,那么到底应该由哪个worker处理该链接的请求处理,整理用的是一个简单的求模运算。用余数和workerID的匹配来进行分配。

最终将request请求数据发送给对应worker的TaskQueue,那么对应的worker的Goroutine就会处理该链接请求了。

8.4 Zinx-V0.8代码实现

好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的Start()方法中,在服务端Accept之前,启动Worker工作池。

zinx/znet/server.go

  1. //开启网络服务
  2. func (s *Server) Start() {
  3. //...
  4. //开启一个go去做服务端Linster业务
  5. go func() {
  6. //0 启动worker工作池机制
  7. s.msgHandler.StartWorkerPool()
  8. //1 获取一个TCP的Addr
  9. addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
  10. if err != nil {
  11. fmt.Println("resolve tcp addr err: ", err)
  12. return
  13. }
  14. //...
  15. //...
  16. }
  17. }()
  18. }

其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。

所以应该在Connection的StartReader()方法中修改:

zinx/znet/connection.go

  1. /*
  2. 读消息Goroutine,用于从客户端中读取数据
  3. */
  4. func (c *Connection) StartReader() {
  5. fmt.Println("Reader Goroutine is running")
  6. defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
  7. defer c.Stop()
  8. for {
  9. // 创建拆包解包的对象...
  10. //读取客户端的Msg head...
  11. //拆包,得到msgid 和 datalen 放在msg中...
  12. //根据 dataLen 读取 data,放在msg.Data中...
  13. //得到当前客户端请求的Request数据
  14. req := Request{
  15. conn:c,
  16. msg:msg,
  17. }
  18. if utils.GlobalObject.WorkerPoolSize > 0 {
  19. //已经启动工作池机制,将消息交给Worker处理
  20. c.MsgHandler.SendMsgToTaskQueue(&req)
  21. } else {
  22. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  23. go c.MsgHandler.DoMsgHandler(&req)
  24. }
  25. }
  26. }

这里并没有强制使用多任务Worker机制,而是判断用户配置WorkerPoolSize的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。

8.5 使用Zinx-V0.8完成应用程序

测试代码和V0.6、V0.7的代码一样。因为Zinx框架对外接口没有发生改变。

我们分别启动Server、Client

  1. $go run Server.go
  1. $go run Client0.go
  1. $go run Client1.go
  1. $go run Client0.go

结果:

服务端:

  1. $ go run Server.go
  2. Add api msgId = 0
  3. Add api msgId = 1
  4. [START] Server name: zinx v-0.8 demoApp,listenner at IP: 127.0.0.1, Port 7777 is starting
  5. [Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
  6. Worker ID = 4 is started.
  7. start Zinx server zinx v-0.8 demoApp succ, now listenning...
  8. Worker ID = 9 is started.
  9. Worker ID = 0 is started.
  10. Worker ID = 5 is started.
  11. Worker ID = 6 is started.
  12. Worker ID = 1 is started.
  13. Worker ID = 2 is started.
  14. Worker ID = 7 is started.
  15. Worker ID = 8 is started.
  16. Worker ID = 3 is started.
  17. Reader Goroutine is running
  18. Add ConnID= 0 request msgID= 0 to workerID= 0
  19. Call PingRouter Handle
  20. recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
  21. Reader Goroutine is running
  22. Add ConnID= 1 request msgID= 1 to workerID= 1
  23. Call HelloZinxRouter Handle
  24. recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
  25. Add ConnID= 0 request msgID= 0 to workerID= 0
  26. Call PingRouter Handle
  27. recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
  28. Reader Goroutine is running
  29. Add ConnID= 2 request msgID= 0 to workerID= 2
  30. Call PingRouter Handle
  31. recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
  32. Add ConnID= 1 request msgID= 1 to workerID= 1
  33. Call HelloZinxRouter Handle
  34. recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
  35. Add ConnID= 0 request msgID= 0 to workerID= 0
  36. Call PingRouter Handle
  37. recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
  38. Add ConnID= 2 request msgID= 0 to workerID= 2
  39. Call PingRouter Handle
  40. recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
  41. Add ConnID= 1 request msgID= 1 to workerID= 1
  42. Call HelloZinxRouter Handle
  43. recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
  44. Add ConnID= 0 request msgID= 0 to workerID= 0
  45. Call PingRouter Handle
  46. recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message

客户端0

  1. $ go run Client0.go
  2. Client Test ... start
  3. ==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
  4. ==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
  5. ==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
  6. ==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping

客户端1

  1. $ go run Client1.go
  2. Client Test ... start
  3. ==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
  4. ==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
  5. ==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8

客户端2

  1. $ go run Client0.go
  2. Client Test ... start
  3. ==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
  4. ==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping