Channel
声明Channel
直接声明
var c chan int
fmt.Println(c) // nil
- 声明类型为 chan int 的变量c, 通道的数据类型是int
- chan int 的 zero-value 是 nil
通过make关键字声明
c := make(chan int)
fmt.Printf("Type of c is %T\n", c) // chan int
fmt.Printf("Value of c is %v\n", c) // 0xc000070060
- channel 值实际是指针 (指针类型的元素值总计有切片、channel和map)
向通道写读数据
写数据
c <- data
读数据
从空channel中读数据会出现异常
<- c
或
val, ok := <- c // 若为false,则说明channel关闭了或者channel为空
使用channel阻塞进程
- 当向channel中写数据时,程序会阻塞直至该channel中的数据被消费
- 当从channel中读数据时,若channel为空程序就会阻塞
package main
import "fmt"
func greet(c chan string) {
fmt.Printf("Hello " + <- c + " !")
}
func main() {
fmt.Println("Main go start")
c := make(chan string)
go greet(c)
c <- "YY"
fmt.Println("Main go end")
}
死锁
fatal error: all goroutines are asleep — deadlock!.
- 向channel中写入数据,并没有其他协程读取该channel的数据
- 从channel中读取数据,若channel中并没有数据就会一直死锁
For Loop
package main
import "fmt"
func squares(c chan int) {
for i := 0; i<= 9; i++ {
c <- i * i
}
close(c)
}
func main() {
fmt.Println("Main start")
c := make(chan int)
go squares(c)
for val := range c {
fmt.Println(val)
}
fmt.Println("Main end")
}
Channel的缓冲长度和容量
c := make(chen Type, n)
- len() 函数用来计算channel的缓冲长度
- cap()函数用来计算channel的容量
for range
用来读取channel中数据,即便channel已经关闭只要channel变量中存在数据,都可以读取数据
- 当向channel中写入的数据个数大于channel的容量时,程序就会阻塞
- 当程序阻塞的同时系统会调用其他的协程来消费channel
- 直至channel中的数据被消费完后,程序才会将执行权移交给刚刚阻塞的进程
单向Channel
只写Channel
roc := make(<-chan int)
只读Channel
soc := make(chan<- int)
package main
import "fmt"
func greet(roc <-chan string) {
fmt.Printf("Hello %v\n", <- roc)
}
func main() {
fmt.Println("Main start ...")
c := make(chan string)
go greet(c)
c <- "YY"
fmt.Println("Main end ...")
}
匿名Goroutine
package main
import "fmt"
func main() {
fmt.Println("Main start ...")
c := make(chan string)
go func(c chan string) {
fmt.Printf("The value is %v\n", <- c)
}(c)
c <- "YY"
fmt.Println("Main end ...")
}
Channel作为Channel数据类型
package main
import "fmt"
func greet(c chan string){
fmt.Printf("Hello %v !\n", <-c)
}
func greeter(cc chan chan string) {
c := make(chan string)
cc <- c
}
func main(){
fmt.Println("Main start ...")
cc := make(chan chan string)
go greeter(cc)
c := <-cc
go greet(c)
c <- "YY"
fmt.Println("Main end ...")
}
Select 关键字
Select VS Switch
- select 就像 switch 一样根据不同的条件执行不同的分支
- select 不需要任何输入参数
- 如果没有default case,select不会阻塞进程
- 一旦有一个case分支符合条件,程序就会执行
- 如果所有的分支都符合条件,程序会随机选择一个分支执行
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func Service1(c chan string) {
c <- "Hello Service1"
}
func Service2(c chan string) {
c <- "Hello Service2"
}
func main() {
fmt.Println("Main start at ", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go Service1(chan1)
go Service2(chan2)
select {
case res := <- chan1:
fmt.Println("Response from service1 ", res, time.Since(start))
case res := <- chan2:
fmt.Println("Response from service2 ", res, time.Since(start))
}
fmt.Println("Main end at ", time.Since(start))
}
添加default timeout
...
select {
case res := <- chan1:
fmt.Println()
case res := <- chan2:
fmt.Println()
case <- time.After(2 * time.Second)
fmt.Println(()
}
空Select
空Select将会导致进程的阻塞
WaitGroup
sync.WaitGroup
当WaitGroup被创建后它的计数(Counter)默认值是0
- Add 该方法需要一个int类型的参数,通常是1
- Wait 当计数Counter不为0时,调用该方法时程序就会阻塞
- Done 每当调用该方法计数Counter就会自动减1
package main
import (
"fmt"
"sync"
"time"
)
func Service(wg *sync.WaitGroup, instance int) {
time.Sleep(2 * time.Second)
fmt.Println("Service on instance ", instance)
wg.Done()
}
func main() {
fmt.Println("Main start ...")
var wg sync.WaitGroup
for i := 1; i <= 3; i ++{
wg.Add(1)
go Service(&wg, i)
}
wg.Wait()
fmt.Println("Main End ...")
}
Worker pool
package main
import (
"fmt"
"time"
)
func sqrWorker(tasks <-chan int, results chan<- int, instance int) {
for num := range tasks {
time.Sleep(time.Millisecond)
fmt.Printf("[worker %v] Sending result by worker %v\n", instance, instance)
results <- num * num
}
}
func main() {
fmt.Println("[main] main() start")
tasks := make(chan int, 10)
results := make(chan int, 10)
for i := 0; i < 3; i++ {
go sqrWorker(tasks, results, i)
}
for i := 0; i < 5; i++ {
tasks <- i * 2
}
fmt.Printf("[main] wrote 5 tasks")
close(tasks)
for i := 0; i < 5; i ++ {
result := <-results
fmt.Println("[main] result ", i, ":", result)
}
fmt.Println("[main] main stop")
}
使用WaitGroup实现上述功能
- 避免一些不必要的上下文的切换
- 但是所有的执行必须要等到wg的计数器减少至0
package main
import (
"fmt"
"time"
"sync"
)
func sqrWorker(wg *sync.WaitGroup, tasks <-chan int, results chan<- int, instance int) {
for num := range tasks {
time.Sleep(time.Millisecond)
fmt.Printf("[worker %v] Sending result by worker %v\n", instance, instance)
results <- num * num
}
wg.Done()
}
func main() {
fmt.Println("[main] main() start")
tasks := make(chan int, 10)
results := make(chan int, 10)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go sqrWorker(&wg, tasks, results, i)
}
for i := 0; i < 5; i++ {
tasks <- i * 2
}
fmt.Printf("[main] wrote 5 tasks")
close(tasks)
wg.Wait()
for i := 0; i < 5; i ++ {
result := <-results
fmt.Println("[main] result ", i, ":", result)
}
fmt.Println("[main] main stop")
}
Mutex 竞态
如果避免不同的goroutines互相竞争资源
**
package main
import (
"fmt"
"sync"
)
var i int
func Worker(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
i = i + 1
m.Unlock()
wg.Done()
}
func main() {
fmt.Println("[main] main start")
var wg sync.WaitGroup
var m sync.Mutex
for i := 1; i<= 1000; i ++ {
wg.Add(1)
go Worker(&wg, &m)
}
wg.Wait()
fmt.Printf("The i is %v", i)
fmt.Println("[main] main end")
}