Broker
broker 这个基本上已经成为 Pub/Sub 消息队列的术语,这个任务调度系统就是通过消息队列进行任务分发的,这里的 Broker 就是代表消息队列的实例了。
图一:iface broker
通过 interface 的方式,machinery 支持多种类型的消息队列,ampq(高级消息队列协议),Google Cloud Pub/Sub,AWS SQS,redis。我们这里以 redis 为例,研究一下 broker 的实现。
这里的 redis borker 并没有使用 redis 提供的 Pub/Sub 功能,而是使用的 LIST 和 ZSET,其中比较复杂的是 DelayedTasks,其实就是在 ZSET 中按照到期时间进行排序,时间到了之后弹出再添加到 LIST 中。
图二:redis broker
另一部分是 Consumer 的实现,这里的重点是对并发度的限制,这里三个 channel 的容量都是并发度的大小。
图三:redis StartConsuming
for i := 0; i < concurrency; i++ {
pool <- struct{}{}
}
go func() {
log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C")
for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
close(deliveries)
return
case <-pool:
select {
case <-b.GetStopChan():
close(deliveries)
return
default:
}
if taskProcessor.PreConsumeHandler() {
task, _ := b.nextTask(getQueue(b.GetConfig(), taskProcessor))
//TODO: should this error be ignored?
if len(task) > 0 {
deliveries <- task
}
}
pool <- struct{}{}
}
}
}()
go func() {
for i := 0; i < concurrency; i++ {
pool <- struct{}{}
}
}()
for {
select {
case err := <-errorsChan:
return err
case d, open := <-deliveries:
if !open {
return nil
}
if concurrency > 0 {
// get execution slot from pool (blocks until one is available)
select {
case <-b.GetStopChan():
b.requeueMessage(d, taskProcessor)
continue
case <-pool:
}
}
b.processingWG.Add(1)
// Consume the task inside a goroutine so multiple tasks
// can be processed concurrently
go func() {
if err := b.consumeOne(d, taskProcessor); err != nil {
errorsChan <- err
}
b.processingWG.Done()
if concurrency > 0 {
// give slot back to pool
pool <- struct{}{}
}
}()
}
}