一个地鼠(gopher)工厂

goroutine和并发 - 图1

goroutine

  • 在 Go 中,独立的任务叫做 goroutine
    • 虽然 goroutine 与其它语言中的协程、进程、线程都有相似之处,但 goroutine 和它们并不完全相同
    • Goroutine 创建效率非常高
    • Go 能直截了当的协同多个并发(concurrent)操作
  • 在某些语言中,将顺序式代码转化为并发式代码需要做大量修改
  • 在 Go 里,无需修改现有顺序式的代码,就可以通过 goroutine 以并发的方式运行任意数量的任务。

启动 goroutine

  • 只需在调用前面加一个 go 关键字。
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. go sleepyGopher() //分支线路
  8. time.Sleep(4 * time.Second) //主线路
  9. }
  10. func sleepyGopher() {
  11. time.Sleep(3 * time.Second)
  12. fmt.Println("... snore ...")
  13. }

不止一个 goroutine

  • 每次使用 go 关键字都会产生一个新的 goroutine。
  • 表面上看,goroutine 似乎在同时运行,但由于计算机处理单元有限,其实技术上来说,这些 goroutine 不是真的在同时运行。
    • 计算机处理器会使用“分时”技术,在多个 goroutine 上轮流花费一些时间。
    • 在使用 goroutine 时,各个 goroutine 的执行顺序无法确定。
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. for i := 0; i < 5; i++ {
  8. go sleepyGopher()
  9. }
  10. time.Sleep(4 * time.Second)
  11. }
  12. func sleepyGopher() {
  13. time.Sleep(3 * time.Second)
  14. fmt.Println("... snore ...")
  15. }

goroutine 的参数

  • 向 goroutine 传递参数就跟向函数传递参数一样,参数都是按值传递的(传入的是副本)
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. for i := 0; i < 5; i++ {
  8. go sleepyGopher(i)
  9. }
  10. time.Sleep(4 * time.Second)
  11. }
  12. func sleepyGopher(id int) {
  13. time.Sleep(3 * time.Second)
  14. fmt.Println("... ", id, " snore ...")
  15. }

通道 channel

  • 通道(channel)可以在多个 goroutine 之间安全的传值。
  • 通道可以用作变量、函数参数、结构体字段…
  • 创建通道用 make 函数,并指定其传输数据的类型
    • c := make(chan int)

通道 channel 发送、接收

  • 使用左箭头操作符 <- 向通道发送值 或 从通道接收值
    • 向通道发送值:c <- 99
    • 从通道接收值:r := <- c
  • 发送操作会等待直到另一个 goroutine 尝试对该通道进行接收操作为止。
    • 执行发送操作的 goroutine 在等待期间将无法执行其它操作
    • 未在等待通道操作的 goroutine 仍然可以继续自由的运行
  • 执行接收操作的 goroutine 将等待直到另一个 goroutine 尝试向该通道进行发送操作为止。
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. c := make(chan int)
  8. for i := 0; i < 5; i++ {
  9. go sleepyGopher(i, c)
  10. }
  11. for i := 0; i < 5; i++ {
  12. gopherID := <-c
  13. fmt.Println("gopher ", gopherID, " has finished sleeping")
  14. }
  15. }
  16. func sleepyGopher(id int, c chan int) {
  17. time.Sleep(3 * time.Second)
  18. fmt.Println("... ", id, " snore ...")
  19. c <- id
  20. }

goroutine和并发 - 图2

使用 select 处理多个通道

  • 等待不同类型的值。
  • time.After 函数,返回一个通道,该通道在指定时间后会接收到一个值(发送该值的 goroutine 是 Go 运行时的一部分)。
  • select 和 switch 有点像。
    • 该语句包含的每个 case 都持有一个通道,用来发送或接收数据。
    • select 会等待直到某个 case 分支的操作就绪,然后就会执行该 case 分支。
  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "time"
  6. )
  7. func main() {
  8. c := make(chan int)
  9. for i := 0; i < 5; i++ {
  10. go sleepyGopher(i, c)
  11. }
  12. timeout := time.After(2 * time.Second)
  13. for i := 0; i < 5; i++ {
  14. select {
  15. case gopherID := <-c:
  16. fmt.Println("gopher ", gopherID, " has finished sleeping")
  17. case <-timeout:
  18. fmt.Println("my patience ran out")
  19. return
  20. }
  21. }
  22. }
  23. func sleepyGopher(id int, c chan int) {
  24. time.Sleep(time.Duration(rand.Intn(4000)) * time.Millisecond)
  25. c <- id
  26. }
  • 注意:即使已经停止等待 goroutine,但只要 main 函数还没返回,仍在运行的 goroutine 将会继续占用内存。

select 语句

  • select 语句在不包含任何 case 的情况下将永远等下去。

nil 通道

  • 如果不使用 make 初始化通道,那么通道变量的值就是 nil(零值)
  • 对 nil 通道进行发送或接收不会引起 panic,但会导致永久阻塞。
  • 对 nil 通道执行 close 函数,那么会引起 panic
  • nil 通道的用处:
    • 对于包含 select 语句的循环,如果不希望每次循环都等待 select 所涉及的所有通道,那么可以先将某些通道设为 nil,等到发送值准备就绪之后,再将通道变成一个非 nil 值并执行发送操作。

阻塞和死锁

  • 当 goroutine 在等待通道的发送或接收时,我们就说它被阻塞了。
  • 除了 goroutine 本身占用少量的内存外,被阻塞的 goroutine 并不消耗任何其它资源。
    • goroutine 静静的停在那里,等待导致其阻塞的事情来解除阻塞。
  • 当一个或多个 goroutine 因为某些永远无法发生的事情被阻塞时,我们称这种情况为死锁。而出现死锁的程序通常会崩溃或挂起。

地鼠装配线

goroutine和并发 - 图3

  • Go 允许在没有值可供发送的情况下通过 close 函数关闭通道
    • 例如 close(c)
  • 通道被关闭后无法写入任何值,如果尝试写入将引发 panic。
  • 尝试读取被关闭的通道会获得与通道类型对应的零值。
  • 注意:如果循环里读取一个已关闭的通道,并没检查通道是否关闭,那么该循环可能会一直运转下去,耗费大量 CPU 时间
  • 执行以下代码可得知通道是否被关闭:
    • v, ok := <- c
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. func main() {
  7. c0 := make(chan string)
  8. c1 := make(chan string)
  9. go sourceGopher(c0)
  10. go filterGopher(c0, c1)
  11. printGopher(c1)
  12. }
  13. func sourceGopher(downstream chan string) {
  14. for _, v := range []string{"hello world", "a bad apple", "goodbye all"} {
  15. downstream <- v
  16. }
  17. downstream <- ""
  18. }
  19. func filterGopher(upstream, downstream chan string) {
  20. for {
  21. item := <-upstream
  22. if item == "" {
  23. downstream <- ""
  24. return
  25. }
  26. if !strings.Contains(item, "bad") {
  27. downstream <- item
  28. }
  29. }
  30. }
  31. func printGopher(upstream chan string) {
  32. for {
  33. v := <-upstream
  34. if v == "" {
  35. return
  36. }
  37. fmt.Println(v)
  38. }
  39. }
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. func main() {
  7. c0 := make(chan string)
  8. c1 := make(chan string)
  9. go sourceGopher(c0)
  10. go filterGopher(c0, c1)
  11. printGopher(c1)
  12. }
  13. func sourceGopher(downstream chan string) {
  14. for _, v := range []string{"hello world", "a bad apple", "goodbye all"} {
  15. downstream <- v
  16. }
  17. close(downstream)
  18. }
  19. func filterGopher(upstream, downstream chan string) {
  20. for {
  21. item, ok := <-upstream
  22. if !ok {
  23. close(downstream)
  24. return
  25. }
  26. if !strings.Contains(item, "bad") {
  27. downstream <- item
  28. }
  29. }
  30. }
  31. func printGopher(upstream chan string) {
  32. for v := range upstream {
  33. fmt.Println(v)
  34. }
  35. }
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. func main() {
  7. c0 := make(chan string)
  8. c1 := make(chan string)
  9. go sourceGopher(c0)
  10. go filterGopher(c0, c1)
  11. printGopher(c1)
  12. }
  13. func sourceGopher(downstream chan string) {
  14. for _, v := range []string{"hello world", "a bad apple", "goodbye all"} {
  15. downstream <- v
  16. }
  17. close(downstream)
  18. }
  19. func filterGopher(upstream, downstream chan string) {
  20. for item := range upstream {
  21. if !strings.Contains(item, "bad") {
  22. downstream <- item
  23. }
  24. }
  25. close(downstream)
  26. }
  27. func printGopher(upstream chan string) {
  28. for v := range upstream {
  29. fmt.Println(v)
  30. }
  31. }

常用模式

  • 从通道里面读取值,直到它关闭为止。
    • 可以使用 range 关键字达到该目的

作业题

  1. 编写一个流水线部件(一个 goroutine),他需要记住前面出现的所有值,并且只有在值之前从未出现过的情况下才会将值传递至流水线的下一个阶段。假定第一个值永远不是空字符串。
  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. c0 := make(chan string)
  7. c1 := make(chan string)
  8. go sourceGopher(c0)
  9. go removeDuplicates(c0, c1)
  10. printGopher(c1)
  11. }
  12. func sourceGopher(downstream chan string) {
  13. for _, v := range []string{"a", "b", "b", "c", "d", "d", "d", "e"} {
  14. downstream <- v
  15. }
  16. close(downstream)
  17. }
  18. func removeDuplicates(upstream, downstream chan string) {
  19. prev := ""
  20. for v := range upstream {
  21. if v != prev {
  22. downstream <- v
  23. prev = v
  24. }
  25. }
  26. close(downstream)
  27. }
  28. func printGopher(upstream chan string) {
  29. for v := range upstream {
  30. fmt.Println(v)
  31. }
  32. }
  1. 编写一个流水线部件,它接收字符串并将它们拆分成单词,然后向流水线的下一阶段一个接一个的发送这些单词。
    • 可以使用 strings.Fields 函数
  1. package main
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. func main() {
  7. c0 := make(chan string)
  8. c1 := make(chan string)
  9. go sourceGopher(c0)
  10. go splitWords(c0, c1)
  11. printGopher(c1)
  12. }
  13. func sourceGopher(downstream chan string) {
  14. for _, v := range []string{"hello world", "a bad apple", "goodbye all"} {
  15. downstream <- v
  16. }
  17. close(downstream)
  18. }
  19. func splitWords(upstream, downstream chan string) {
  20. for v := range upstream {
  21. for _, word := range strings.Fields(v) {
  22. downstream <- word
  23. }
  24. }
  25. close(downstream)
  26. }
  27. func printGopher(upstream chan string) {
  28. for v := range upstream {
  29. fmt.Println(v)
  30. }
  31. }