Channel

声明Channel

直接声明

  1. var c chan int
  2. fmt.Println(c) // nil
  • 声明类型为 chan int 的变量c, 通道的数据类型是int
  • chan int 的 zero-value 是 nil

通过make关键字声明

  1. c := make(chan int)
  2. fmt.Printf("Type of c is %T\n", c) // chan int
  3. fmt.Printf("Value of c is %v\n", c) // 0xc000070060
  • channel 值实际是指针 (指针类型的元素值总计有切片、channel和map)

向通道写读数据

写数据

  1. c <- data

读数据

从空channel中读数据会出现异常

  1. <- c
  2. val, ok := <- c // 若为false,则说明channel关闭了或者channel为空

使用channel阻塞进程

  • 当向channel中写数据时,程序会阻塞直至该channel中的数据被消费
  • 当从channel中读数据时,若channel为空程序就会阻塞
  1. package main
  2. import "fmt"
  3. func greet(c chan string) {
  4. fmt.Printf("Hello " + <- c + " !")
  5. }
  6. func main() {
  7. fmt.Println("Main go start")
  8. c := make(chan string)
  9. go greet(c)
  10. c <- "YY"
  11. fmt.Println("Main go end")
  12. }

死锁

fatal error: all goroutines are asleep — deadlock!.

  • 向channel中写入数据,并没有其他协程读取该channel的数据
  • 从channel中读取数据,若channel中并没有数据就会一直死锁

For Loop

  1. package main
  2. import "fmt"
  3. func squares(c chan int) {
  4. for i := 0; i<= 9; i++ {
  5. c <- i * i
  6. }
  7. close(c)
  8. }
  9. func main() {
  10. fmt.Println("Main start")
  11. c := make(chan int)
  12. go squares(c)
  13. for val := range c {
  14. fmt.Println(val)
  15. }
  16. fmt.Println("Main end")
  17. }

Channel的缓冲长度和容量

  1. c := make(chen Type, n)
  • len() 函数用来计算channel的缓冲长度
  • cap()函数用来计算channel的容量
  • for range

    用来读取channel中数据,即便channel已经关闭只要channel变量中存在数据,都可以读取数据

  1. 当向channel中写入的数据个数大于channel的容量时,程序就会阻塞
  2. 当程序阻塞的同时系统会调用其他的协程来消费channel
  3. 直至channel中的数据被消费完后,程序才会将执行权移交给刚刚阻塞的进程

单向Channel

只写Channel

  1. roc := make(<-chan int)

只读Channel

  1. soc := make(chan<- int)
  1. package main
  2. import "fmt"
  3. func greet(roc <-chan string) {
  4. fmt.Printf("Hello %v\n", <- roc)
  5. }
  6. func main() {
  7. fmt.Println("Main start ...")
  8. c := make(chan string)
  9. go greet(c)
  10. c <- "YY"
  11. fmt.Println("Main end ...")
  12. }

匿名Goroutine

  1. package main
  2. import "fmt"
  3. func main() {
  4. fmt.Println("Main start ...")
  5. c := make(chan string)
  6. go func(c chan string) {
  7. fmt.Printf("The value is %v\n", <- c)
  8. }(c)
  9. c <- "YY"
  10. fmt.Println("Main end ...")
  11. }

Channel作为Channel数据类型

  1. package main
  2. import "fmt"
  3. func greet(c chan string){
  4. fmt.Printf("Hello %v !\n", <-c)
  5. }
  6. func greeter(cc chan chan string) {
  7. c := make(chan string)
  8. cc <- c
  9. }
  10. func main(){
  11. fmt.Println("Main start ...")
  12. cc := make(chan chan string)
  13. go greeter(cc)
  14. c := <-cc
  15. go greet(c)
  16. c <- "YY"
  17. fmt.Println("Main end ...")
  18. }

Select 关键字

Select VS Switch

  • select 就像 switch 一样根据不同的条件执行不同的分支
  • select 不需要任何输入参数
  • 如果没有default case,select不会阻塞进程
  • 一旦有一个case分支符合条件,程序就会执行
  • 如果所有的分支都符合条件,程序会随机选择一个分支执行
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. var start time.Time
  7. func init() {
  8. start = time.Now()
  9. }
  10. func Service1(c chan string) {
  11. c <- "Hello Service1"
  12. }
  13. func Service2(c chan string) {
  14. c <- "Hello Service2"
  15. }
  16. func main() {
  17. fmt.Println("Main start at ", time.Since(start))
  18. chan1 := make(chan string)
  19. chan2 := make(chan string)
  20. go Service1(chan1)
  21. go Service2(chan2)
  22. select {
  23. case res := <- chan1:
  24. fmt.Println("Response from service1 ", res, time.Since(start))
  25. case res := <- chan2:
  26. fmt.Println("Response from service2 ", res, time.Since(start))
  27. }
  28. fmt.Println("Main end at ", time.Since(start))
  29. }

添加default timeout

  1. ...
  2. select {
  3. case res := <- chan1:
  4. fmt.Println()
  5. case res := <- chan2:
  6. fmt.Println()
  7. case <- time.After(2 * time.Second)
  8. fmt.Println(()
  9. }

空Select

空Select将会导致进程的阻塞

WaitGroup

sync.WaitGroup

当WaitGroup被创建后它的计数(Counter)默认值是0

  • Add 该方法需要一个int类型的参数,通常是1
  • Wait 当计数Counter不为0时,调用该方法时程序就会阻塞
  • Done 每当调用该方法计数Counter就会自动减1
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func Service(wg *sync.WaitGroup, instance int) {
  8. time.Sleep(2 * time.Second)
  9. fmt.Println("Service on instance ", instance)
  10. wg.Done()
  11. }
  12. func main() {
  13. fmt.Println("Main start ...")
  14. var wg sync.WaitGroup
  15. for i := 1; i <= 3; i ++{
  16. wg.Add(1)
  17. go Service(&wg, i)
  18. }
  19. wg.Wait()
  20. fmt.Println("Main End ...")
  21. }

Worker pool

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func sqrWorker(tasks <-chan int, results chan<- int, instance int) {
  7. for num := range tasks {
  8. time.Sleep(time.Millisecond)
  9. fmt.Printf("[worker %v] Sending result by worker %v\n", instance, instance)
  10. results <- num * num
  11. }
  12. }
  13. func main() {
  14. fmt.Println("[main] main() start")
  15. tasks := make(chan int, 10)
  16. results := make(chan int, 10)
  17. for i := 0; i < 3; i++ {
  18. go sqrWorker(tasks, results, i)
  19. }
  20. for i := 0; i < 5; i++ {
  21. tasks <- i * 2
  22. }
  23. fmt.Printf("[main] wrote 5 tasks")
  24. close(tasks)
  25. for i := 0; i < 5; i ++ {
  26. result := <-results
  27. fmt.Println("[main] result ", i, ":", result)
  28. }
  29. fmt.Println("[main] main stop")
  30. }

使用WaitGroup实现上述功能

  • 避免一些不必要的上下文的切换
  • 但是所有的执行必须要等到wg的计数器减少至0
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "sync"
  6. )
  7. func sqrWorker(wg *sync.WaitGroup, tasks <-chan int, results chan<- int, instance int) {
  8. for num := range tasks {
  9. time.Sleep(time.Millisecond)
  10. fmt.Printf("[worker %v] Sending result by worker %v\n", instance, instance)
  11. results <- num * num
  12. }
  13. wg.Done()
  14. }
  15. func main() {
  16. fmt.Println("[main] main() start")
  17. tasks := make(chan int, 10)
  18. results := make(chan int, 10)
  19. var wg sync.WaitGroup
  20. for i := 0; i < 3; i++ {
  21. wg.Add(1)
  22. go sqrWorker(&wg, tasks, results, i)
  23. }
  24. for i := 0; i < 5; i++ {
  25. tasks <- i * 2
  26. }
  27. fmt.Printf("[main] wrote 5 tasks")
  28. close(tasks)
  29. wg.Wait()
  30. for i := 0; i < 5; i ++ {
  31. result := <-results
  32. fmt.Println("[main] result ", i, ":", result)
  33. }
  34. fmt.Println("[main] main stop")
  35. }

Mutex 竞态

如果避免不同的goroutines互相竞争资源
**

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var i int
  7. func Worker(wg *sync.WaitGroup, m *sync.Mutex) {
  8. m.Lock()
  9. i = i + 1
  10. m.Unlock()
  11. wg.Done()
  12. }
  13. func main() {
  14. fmt.Println("[main] main start")
  15. var wg sync.WaitGroup
  16. var m sync.Mutex
  17. for i := 1; i<= 1000; i ++ {
  18. wg.Add(1)
  19. go Worker(&wg, &m)
  20. }
  21. wg.Wait()
  22. fmt.Printf("The i is %v", i)
  23. fmt.Println("[main] main end")
  24. }