开启并发模式

Cover image

在这篇文章中,我将介绍在 Go 中使用基本并发模式和原生原语来构建并发应用程序的一些最佳实践。 模式本身适用于任何语言,但对于这些示例,我们将使用 Go。

可以下载本文的源码配合阅读。

  1. git clone https://github.com/benjivesterby/may2021-triangle-meetup.git

流水线模式

流水线模式 1 2 通常由若干个阶段组成,每个阶段都通过一系列 channel 连接。第一阶段是数据来源,最后阶段是数据汇总。数据流水线就是很好的例子,第一步挖掘数据,接下来清洗数据,最后一步将清洗的数据存储到数据库中。

A diagram showing an example Pipeline Example Pipeline

如上图所示,第一阶段是数据的来源,最后阶段为数据汇总。下面是流水线模式实战的示例代码。

  1. // Example of a pipeline pattern
  2. func main() {
  3. // Call each function passing the output of the previous function
  4. // as the input to the next function
  5. d := []data{
  6. // ... data ...
  7. }
  8. // Create the pipeline
  9. sink(sanitize(source(d)))
  10. }
  11. func source(in []data) <-chan data {
  12. out := make(chan data)
  13. go func(in []data) {
  14. defer close(out)
  15. for _, d := range data {
  16. // Load data into the front of the pipeline
  17. out <- d
  18. }
  19. }(in)
  20. return out
  21. }
  22. func sanitize(in <-chan data) <-chan data {
  23. out := make(chan data)
  24. go func(in <-chan data, out chan<-data) {
  25. defer close(out)
  26. for {
  27. d, ok := <- in
  28. if !ok {
  29. return
  30. }
  31. // ... Do some work
  32. // push the data out
  33. out <- d
  34. }
  35. }(in, out)
  36. return out
  37. }
  38. func sink(in <-chan data) {
  39. for {
  40. d, ok := <- in
  41. if !ok {
  42. return
  43. }
  44. // ... Process the sanitized data
  45. }
  46. }

有关该模式可执行的示例代码,可以查看本文的 Github 项目代码 pipeline 目录。在项目根目录下运行下面的命令来执行流水线示例。

  1. go run patterns/pipeline/main.go

流水线模式通过一系列 goroutine 组成,每个 goroutine 由一个 channel 传入数据,另一个 channel 输出数据。使用此模式,你可以创建任何大小和复杂性的流水线。

扇出模式

扇出模式是指允许多个协程来使用来自单个数据源的的模式。当你需要对多协程进行的大数据处理做负载均衡时,该模式很有用。

有关该模式可执行的示例代码,可以查看本文的 Github 项目代码 fan-out 目录。在项目根目录下运行下面的命令来执行扇出示例。

  1. go run patterns/fanout/main.go

A diagram showing an example Fan-Out Example Fan-Out

下面是扇出模式的示例代码,其中数据被一系列的工作协程处理。

  1. // Example of a fan-out pattern for processing data in parallel
  2. func main() {
  3. dataPipeline := make(chan data)
  4. // Create three worker routines
  5. go process(dataPipeline)
  6. go process(dataPipeline)
  7. go process(dataPipeline)
  8. // Load the data into the pipeline
  9. for _, d := range ReadData() {
  10. dataPipeline <- d
  11. }
  12. // Close the pipeline when all data has been read in
  13. // NOTE: This will immediately close the application regardless
  14. // of whether the workers have completed their processing.
  15. // See the section on Best Practices at the end of the post
  16. // for more information on how to handle this.
  17. close(dataPipeline)
  18. }
  19. func ReadData() []data {
  20. // ...
  21. }
  22. func process(in <-chan data) {
  23. for {
  24. d, ok := <- in
  25. if !ok {
  26. return
  27. }
  28. // ... Do some work
  29. }
  30. }

上面例子中 process 函数被调用了三次,每个 goroutine 各行其事。channel in 作为入参传给每个 goroutine ,并通过一个 for 循环读取数据。当数据读取完毕时,channel in 被关闭。

复制模式

我们看到扇出模式不仅可以用来并行处理数据,还可以用作多线程复制数据。

  1. // Example of a fan-out replication pattern
  2. func main() {
  3. dataPipeline := make(chan data)
  4. // Pass the write-only channels from the three proc calls to the fan-out
  5. go replicate(dataPipeline, proc1(), proc2(), proc3())
  6. // Load the data into the pipeline
  7. for _, d := range ReadData() {
  8. dataPipeline <- d
  9. }
  10. }
  11. func proc1() chan<- data {/* ... */}
  12. func proc2() chan<- data {/* ... */}
  13. func proc3() chan<- data {/* ... */}
  14. func replicate(in <-chan data, outgoing ...chan<- data) {
  15. for {
  16. d, ok := <- in // Read from the input channel
  17. if !ok {
  18. return
  19. }
  20. // Replicate the data to each of the outgoing channels
  21. for _, out := range outgoing {
  22. out <- d
  23. }
  24. }
  25. }

当使用复制模式的时候请注意数据的类型。特别是使用指针时候,因为复制器不是在复制数据而是在传递指针。

上面是一个扇出模式的示例代码,replicate 函数通过可变参数传入了三个 channel 被调用。channel in 提供了原始数据,并将其复制到输出 channel。

类型扇出

最后一个扇出模型我们将介绍 类型扇出 。当处理 interface{} 类型的 channel 时非常有用。此模式允许根据数据类型将数据定向到适当的处理器。

  1. // Example of a type fan-out pattern from the github project
  2. // for this post
  3. func TypeFan(in <-chan interface{}) (
  4. <-chan int,
  5. <-chan string,
  6. <-chan []byte,
  7. ) {
  8. ints := make(chan int)
  9. strings := make(chan string)
  10. bytes := make(chan []byte)
  11. go func(
  12. in <-chan interface{},
  13. ints chan<- int,
  14. strings chan<- string,
  15. bytes chan<- []byte,
  16. ) {
  17. defer close(ints)
  18. defer close(strings)
  19. defer close(bytes)
  20. for {
  21. data, ok := <-in
  22. if !ok {
  23. return
  24. }
  25. // Type Switch on the data coming in and
  26. // push the data to the appropriate channel
  27. switch value := data.(type) {
  28. case nil: // Special case in type switch
  29. // Do nothing
  30. case int:
  31. ints <- value
  32. case string:
  33. strings <- value
  34. case []byte:
  35. bytes <- value
  36. default:
  37. fmt.Printf("%T is an unsupported type", data)
  38. }
  39. }
  40. }(in, ints, strings, bytes)
  41. return ints, strings, bytes
  42. }

上面的例子展示了如何接收一个空的 interface(即 interface{})并使用类型开关来决定将数据发送到哪个 channel。

扇入/合并器模式

使用扇入模式时,数据会从多个 channel 读取后合并到一个 channel 输出。3 扇入模式与扇出模式刚好相反。

有关该模式可执行的示例代码,可以查看本文的 Github 项目代码 fan-in 目录。在项目根目录下运行下面的命令来执行扇入示例。

  1. go run patterns/fanin/main.go

A diagram showing an example Fan-In Example Fan-In

下面是一个扇入模式的代码示例,数据被多个工作协程挖掘并全部集中到一个单独的 channel 中。

  1. // Example of a fan-in pattern mining data in
  2. // parallel, but processing it all synchronously
  3. func main() {
  4. miner1 := mine()
  5. miner2 := mine()
  6. miner3 := mine()
  7. // Take miner data and consolidate it into a single channel
  8. out := consolidate(miner1, miner2, miner3)
  9. // Process the data
  10. for {
  11. data, ok := <- out
  12. if !ok {
  13. return
  14. }
  15. // ... Do some work
  16. }
  17. }
  18. func consolidate(miners ...<-chan data) <-chan data {
  19. out := make(chan data)
  20. // Iterate over the miners and start a routine for
  21. // consuming each one and merging them into the output
  22. for _, in := range miners {
  23. go func(in <-chan data, out chan<- data) {
  24. for {
  25. d, ok := <-in // Pull data from the miner
  26. if !ok {
  27. return
  28. }
  29. out <- d // Send the data to the output
  30. }
  31. }(in, out)
  32. }
  33. return out
  34. }
  35. func mine() <-chan data {
  36. out := make(chan data)
  37. go func() {
  38. // ... populate the channel with mined data
  39. }()
  40. return out
  41. }

上面的例子利用扇入模式从多个模拟数据挖掘器中整合输入的数据。

组合和嵌套模式

这些模式中的每一个都可以组合起来以创造更复杂的模式。这会非常有用,因为大多数应用不会只使用一种并发模式。

下面是一个将所有模式组合到一个请求响应生命周期的例子。该例子中,数据来自单一来源,扇出到多个流水线,最后扇入成单个响应返回给用户。

A diagram showing an example of a request-response life-cycle

当构建应用程序时,我建议先构建一个设计图来帮助概念化这些并发设计元素。我真的很喜欢用 diagrams.net 来画设计图。构建这些设计图的过程有助于巩固最终产品并使设计更易于理解。 将设计作为流程的一部分也将有助于将设计出售给其他工程师和领导层。

尽可能使用 Go 的原生并发原语

最佳实践

虽然主要使用 Go 的并发原语来管理 Go 应用程序中的并发被认为是最佳实践,但是在某些情况下,需要使用 sync 包来辅助管理并发。

一个很好的例子就是当你实现了类似于io.Closer 时需要确保所有协程都退出了。例如,如果你的代码产生了 N 个协程并且希望调用 io.Closer 方法时所有的协程都正确退出,你可以使用 sync.WaitGroup 来等待所有的协程直到它们被关闭。

执行此操作方法如下所示。

  1. // Example of using a wait group to
  2. // ensure all routines are properly exited
  3. type mytype struct {
  4. ctx context.Context
  5. cancel context.CancelFunc
  6. wg sync.WaitGroup
  7. wgMu sync.Mutex
  8. }
  9. // DoSomething spawns a go routine
  10. // every time it's called
  11. func (t *mytype) DoSomething() {
  12. // Increment the waitgroup to ensure
  13. // Close properly blocks
  14. t.wgMu.Lock()
  15. t.wg.Add(1)
  16. t.wgMu.Unlock()
  17. go func() {
  18. // Decrement the waitgroup
  19. // when the routine exits
  20. defer t.wg.Done()
  21. // ... do something
  22. }()
  23. }
  24. func (t *mytype) Close() error {
  25. // Cancel the internal context
  26. t.cancel()
  27. // Wait for all routines to exit
  28. t.wgMu.Lock()
  29. t.wg.Wait()
  30. t.wgMu.Unlock()
  31. return nil
  32. }

上面的代码有几个重点。首先,使用 sync.WaitGroup来增加和减少正在运行的协程数量。其次,使用 sync.Mutex 来确保只有同时只有一个协程在修改 sync.WaitGroup.Wait() 方法不需要 mutex )

单击此处阅读 Leo Lara 对此的详细解释 4

有关需要使用 sync 包的情况的功能示例,请参阅 Plex 库

泛型 (即将到来!)

随着 Go 1.18 中泛型的引入,这些模式使用起来会变得更加容易,我将在下一篇文章中介绍。


  1. Pipelines - Concurrency in Go by Katherine Cox-Buday - Page 100 ↩︎

  2. Go Concurrency Patterns: Pipelines and cancellation ↩︎

  3. Multiplexing - Concurrency in Go by Katherine Cox-Buday - Page 117 ↩︎

  4. Closing a Go channel written by several goroutines ↩︎