Broker
broker 这个基本上已经成为 Pub/Sub 消息队列的术语,这个任务调度系统就是通过消息队列进行任务分发的,这里的 Broker 就是代表消息队列的实例了。
图一:iface broker
通过 interface 的方式,machinery 支持多种类型的消息队列,ampq(高级消息队列协议),Google Cloud Pub/Sub,AWS SQS,redis。我们这里以 redis 为例,研究一下 broker 的实现。
这里的 redis borker 并没有使用 redis 提供的 Pub/Sub 功能,而是使用的 LIST 和 ZSET,其中比较复杂的是 DelayedTasks,其实就是在 ZSET 中按照到期时间进行排序,时间到了之后弹出再添加到 LIST 中。
图二:redis broker
另一部分是 Consumer 的实现,这里的重点是对并发度的限制,这里三个 channel 的容量都是并发度的大小。
图三:redis StartConsuming
for i := 0; i < concurrency; i++ {pool <- struct{}{}}go func() {log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C")for {select {// A way to stop this goroutine from b.StopConsumingcase <-b.GetStopChan():close(deliveries)returncase <-pool:select {case <-b.GetStopChan():close(deliveries)returndefault:}if taskProcessor.PreConsumeHandler() {task, _ := b.nextTask(getQueue(b.GetConfig(), taskProcessor))//TODO: should this error be ignored?if len(task) > 0 {deliveries <- task}}pool <- struct{}{}}}}()
go func() {for i := 0; i < concurrency; i++ {pool <- struct{}{}}}()for {select {case err := <-errorsChan:return errcase d, open := <-deliveries:if !open {return nil}if concurrency > 0 {// get execution slot from pool (blocks until one is available)select {case <-b.GetStopChan():b.requeueMessage(d, taskProcessor)continuecase <-pool:}}b.processingWG.Add(1)// Consume the task inside a goroutine so multiple tasks// can be processed concurrentlygo func() {if err := b.consumeOne(d, taskProcessor); err != nil {errorsChan <- err}b.processingWG.Done()if concurrency > 0 {// give slot back to poolpool <- struct{}{}}}()}}
