Broker

broker 这个基本上已经成为 Pub/Sub 消息队列的术语,这个任务调度系统就是通过消息队列进行任务分发的,这里的 Broker 就是代表消息队列的实例了。
RichardKnop/machinery - 图1
图一:iface broker

通过 interface 的方式,machinery 支持多种类型的消息队列,ampq(高级消息队列协议),Google Cloud Pub/Sub,AWS SQS,redis。我们这里以 redis 为例,研究一下 broker 的实现。

这里的 redis borker 并没有使用 redis 提供的 Pub/Sub 功能,而是使用的 LIST 和 ZSET,其中比较复杂的是 DelayedTasks,其实就是在 ZSET 中按照到期时间进行排序,时间到了之后弹出再添加到 LIST 中。
RichardKnop/machinery - 图2
图二:redis broker

另一部分是 Consumer 的实现,这里的重点是对并发度的限制,这里三个 channel 的容量都是并发度的大小。
RichardKnop/machinery - 图3
图三:redis StartConsuming

  1. for i := 0; i < concurrency; i++ {
  2. pool <- struct{}{}
  3. }
  4. go func() {
  5. log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C")
  6. for {
  7. select {
  8. // A way to stop this goroutine from b.StopConsuming
  9. case <-b.GetStopChan():
  10. close(deliveries)
  11. return
  12. case <-pool:
  13. select {
  14. case <-b.GetStopChan():
  15. close(deliveries)
  16. return
  17. default:
  18. }
  19. if taskProcessor.PreConsumeHandler() {
  20. task, _ := b.nextTask(getQueue(b.GetConfig(), taskProcessor))
  21. //TODO: should this error be ignored?
  22. if len(task) > 0 {
  23. deliveries <- task
  24. }
  25. }
  26. pool <- struct{}{}
  27. }
  28. }
  29. }()
  1. go func() {
  2. for i := 0; i < concurrency; i++ {
  3. pool <- struct{}{}
  4. }
  5. }()
  6. for {
  7. select {
  8. case err := <-errorsChan:
  9. return err
  10. case d, open := <-deliveries:
  11. if !open {
  12. return nil
  13. }
  14. if concurrency > 0 {
  15. // get execution slot from pool (blocks until one is available)
  16. select {
  17. case <-b.GetStopChan():
  18. b.requeueMessage(d, taskProcessor)
  19. continue
  20. case <-pool:
  21. }
  22. }
  23. b.processingWG.Add(1)
  24. // Consume the task inside a goroutine so multiple tasks
  25. // can be processed concurrently
  26. go func() {
  27. if err := b.consumeOne(d, taskProcessor); err != nil {
  28. errorsChan <- err
  29. }
  30. b.processingWG.Done()
  31. if concurrency > 0 {
  32. // give slot back to pool
  33. pool <- struct{}{}
  34. }
  35. }()
  36. }
  37. }