Channel
声明Channel
直接声明
var c chan intfmt.Println(c) // nil
- 声明类型为 chan int 的变量c, 通道的数据类型是int
- chan int 的 zero-value 是 nil
通过make关键字声明
c := make(chan int)fmt.Printf("Type of c is %T\n", c) // chan intfmt.Printf("Value of c is %v\n", c) // 0xc000070060
- channel 值实际是指针 (指针类型的元素值总计有切片、channel和map)
向通道写读数据
写数据
c <- data
读数据
从空channel中读数据会出现异常
<- c或val, ok := <- c // 若为false,则说明channel关闭了或者channel为空
使用channel阻塞进程
- 当向channel中写数据时,程序会阻塞直至该channel中的数据被消费
- 当从channel中读数据时,若channel为空程序就会阻塞
package mainimport "fmt"func greet(c chan string) {fmt.Printf("Hello " + <- c + " !")}func main() {fmt.Println("Main go start")c := make(chan string)go greet(c)c <- "YY"fmt.Println("Main go end")}
死锁
fatal error: all goroutines are asleep — deadlock!.
- 向channel中写入数据,并没有其他协程读取该channel的数据
- 从channel中读取数据,若channel中并没有数据就会一直死锁
For Loop
package mainimport "fmt"func squares(c chan int) {for i := 0; i<= 9; i++ {c <- i * i}close(c)}func main() {fmt.Println("Main start")c := make(chan int)go squares(c)for val := range c {fmt.Println(val)}fmt.Println("Main end")}
Channel的缓冲长度和容量
c := make(chen Type, n)
- len() 函数用来计算channel的缓冲长度
- cap()函数用来计算channel的容量
for range
用来读取channel中数据,即便channel已经关闭只要channel变量中存在数据,都可以读取数据
- 当向channel中写入的数据个数大于channel的容量时,程序就会阻塞
- 当程序阻塞的同时系统会调用其他的协程来消费channel
- 直至channel中的数据被消费完后,程序才会将执行权移交给刚刚阻塞的进程
单向Channel
只写Channel
roc := make(<-chan int)
只读Channel
soc := make(chan<- int)
package mainimport "fmt"func greet(roc <-chan string) {fmt.Printf("Hello %v\n", <- roc)}func main() {fmt.Println("Main start ...")c := make(chan string)go greet(c)c <- "YY"fmt.Println("Main end ...")}
匿名Goroutine
package mainimport "fmt"func main() {fmt.Println("Main start ...")c := make(chan string)go func(c chan string) {fmt.Printf("The value is %v\n", <- c)}(c)c <- "YY"fmt.Println("Main end ...")}
Channel作为Channel数据类型
package mainimport "fmt"func greet(c chan string){fmt.Printf("Hello %v !\n", <-c)}func greeter(cc chan chan string) {c := make(chan string)cc <- c}func main(){fmt.Println("Main start ...")cc := make(chan chan string)go greeter(cc)c := <-ccgo greet(c)c <- "YY"fmt.Println("Main end ...")}
Select 关键字
Select VS Switch
- select 就像 switch 一样根据不同的条件执行不同的分支
- select 不需要任何输入参数
- 如果没有default case,select不会阻塞进程
- 一旦有一个case分支符合条件,程序就会执行
- 如果所有的分支都符合条件,程序会随机选择一个分支执行
package mainimport ("fmt""time")var start time.Timefunc init() {start = time.Now()}func Service1(c chan string) {c <- "Hello Service1"}func Service2(c chan string) {c <- "Hello Service2"}func main() {fmt.Println("Main start at ", time.Since(start))chan1 := make(chan string)chan2 := make(chan string)go Service1(chan1)go Service2(chan2)select {case res := <- chan1:fmt.Println("Response from service1 ", res, time.Since(start))case res := <- chan2:fmt.Println("Response from service2 ", res, time.Since(start))}fmt.Println("Main end at ", time.Since(start))}
添加default timeout
...select {case res := <- chan1:fmt.Println()case res := <- chan2:fmt.Println()case <- time.After(2 * time.Second)fmt.Println(()}
空Select
空Select将会导致进程的阻塞
WaitGroup
sync.WaitGroup
当WaitGroup被创建后它的计数(Counter)默认值是0
- Add 该方法需要一个int类型的参数,通常是1
- Wait 当计数Counter不为0时,调用该方法时程序就会阻塞
- Done 每当调用该方法计数Counter就会自动减1
package mainimport ("fmt""sync""time")func Service(wg *sync.WaitGroup, instance int) {time.Sleep(2 * time.Second)fmt.Println("Service on instance ", instance)wg.Done()}func main() {fmt.Println("Main start ...")var wg sync.WaitGroupfor i := 1; i <= 3; i ++{wg.Add(1)go Service(&wg, i)}wg.Wait()fmt.Println("Main End ...")}
Worker pool
package mainimport ("fmt""time")func sqrWorker(tasks <-chan int, results chan<- int, instance int) {for num := range tasks {time.Sleep(time.Millisecond)fmt.Printf("[worker %v] Sending result by worker %v\n", instance, instance)results <- num * num}}func main() {fmt.Println("[main] main() start")tasks := make(chan int, 10)results := make(chan int, 10)for i := 0; i < 3; i++ {go sqrWorker(tasks, results, i)}for i := 0; i < 5; i++ {tasks <- i * 2}fmt.Printf("[main] wrote 5 tasks")close(tasks)for i := 0; i < 5; i ++ {result := <-resultsfmt.Println("[main] result ", i, ":", result)}fmt.Println("[main] main stop")}
使用WaitGroup实现上述功能
- 避免一些不必要的上下文的切换
- 但是所有的执行必须要等到wg的计数器减少至0
package mainimport ("fmt""time""sync")func sqrWorker(wg *sync.WaitGroup, tasks <-chan int, results chan<- int, instance int) {for num := range tasks {time.Sleep(time.Millisecond)fmt.Printf("[worker %v] Sending result by worker %v\n", instance, instance)results <- num * num}wg.Done()}func main() {fmt.Println("[main] main() start")tasks := make(chan int, 10)results := make(chan int, 10)var wg sync.WaitGroupfor i := 0; i < 3; i++ {wg.Add(1)go sqrWorker(&wg, tasks, results, i)}for i := 0; i < 5; i++ {tasks <- i * 2}fmt.Printf("[main] wrote 5 tasks")close(tasks)wg.Wait()for i := 0; i < 5; i ++ {result := <-resultsfmt.Println("[main] result ", i, ":", result)}fmt.Println("[main] main stop")}
Mutex 竞态
如果避免不同的goroutines互相竞争资源
**
package mainimport ("fmt""sync")var i intfunc Worker(wg *sync.WaitGroup, m *sync.Mutex) {m.Lock()i = i + 1m.Unlock()wg.Done()}func main() {fmt.Println("[main] main start")var wg sync.WaitGroupvar m sync.Mutexfor i := 1; i<= 1000; i ++ {wg.Add(1)go Worker(&wg, &m)}wg.Wait()fmt.Printf("The i is %v", i)fmt.Println("[main] main end")}
