goroutine(协程)

基本介绍

进程和线程介绍

  1. 进程就是程序程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位
  2. 线程是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位。
  3. 一个进程可以创建核销毁多个线程,同一个进程中的多个线程可以并发执行。
  4. 一个程序至少有一个进程,一个进程至少有一个线程

    并发和并行

    多线程程序在单核上运行,就是并发
  • 多个任务作用在一个cup
  • 从微观的角度看,在一个时间点上,其实只有一个任务在执行

多线程程序在多核上运行,就是并行

  • 多个任务作用在多个cpu
  • 从微观的角度看,在一个时间点上,就是多个任务在同时执行

    Go 协程和Go 主线程

    Go 主线程(有程序员直接称为线程/也可以理解成进程): 一个 Go 线程上,可以起多个协程,你可以这样理解,协程是轻量级的线程(编译器做优化)
    Go 协程的特点:

  • 有独立的栈空间

  • 共享程序堆空间
  • 调度由用户控制
  • 协程是轻量级的线程

goroutine和 channel - 图1

快速入门

请编写一个程序,完成如下功能:

  • 在主线程(可以理解成进程)中,开启一个 goroutine, 该协程每隔 1 秒输出 “hello,world”
  • 在主线程中也每隔一秒输出”hello,golang”, 输出 10 次后,退出程序
  • 要求主线程和 goroutine 同时执行
  • 画出主线程和协程执行流程图
  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. )
  7. // 在主线程(可以理解成进程)中,开启一个goroutine, 该协程每隔1秒输出 "hello,world"
  8. // 在主线程中也每隔一秒输出"hello,golang", 输出10次后,退出程序
  9. // 要求主线程和goroutine同时执行
  10. //编写一个函数,每隔1秒输出 "hello,world"
  11. func test() {
  12. for i := 1; i <= 10; i++ {
  13. fmt.Println("test () hello,world " + strconv.Itoa(i))
  14. time.Sleep(time.Second)
  15. }
  16. }
  17. func main() {
  18. go test() // 开启了一个协程
  19. for i := 1; i <= 10; i++ {
  20. fmt.Println(" main() hello,golang" + strconv.Itoa(i))
  21. time.Sleep(time.Second)
  22. }
  23. }
 main() hello,golang1
test () hello,world 1
test () hello,world 2
 main() hello,golang2
 main() hello,golang3
test () hello,world 3
 main() hello,golang4
test () hello,world 4
test () hello,world 5
 main() hello,golang5
 main() hello,golang6
test () hello,world 6
test () hello,world 7
 main() hello,golang7
test () hello,world 8
 main() hello,golang8
test () hello,world 9
 main() hello,golang9
test () hello,world 10
 main() hello,golang10

goroutine和 channel - 图2
快速入门小结

  1. 主线程是一个物理线程,直接作用在 cpu 上的。是重量级的,非常耗费 cpu 资源。
  2. 协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。
  3. Golang 的协程机制是重要的特点,可以轻松的开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显 Golang 在并发上的优势了

    调度模型

    MPG 模式基本介绍

    M:操作系统的主线程(是物理线程)
    P:协程执行需要的上下文
    G:协程

    MPG 模式运行的状态

    goroutine和 channel - 图3

  4. 分成两个部分来看

  5. 原来的情况是M0主线程正在执行G0协程,另外有三个协程在队列等待
  6. 如果G0协程阻塞,比如读取文件或者数据库等
  7. 这时就会创建M1主线程(也可能是从已有的线程池中取出M1),并且将等待的3个协程挂到M1下开始执行,M0的主线程下的G0仍然执行文件io的读写。
  8. 这样的MPG调度模式,可以既让G0执行,同时也不会让队列的其它协程一直阻塞,仍然可以并发/并行执行
  9. 等到G0不阻塞了,M0会被放到空闲的主线程继续执行(从已有的线程池中取),同时G0又会被唤醒。

    设置 Golang 运行的cpu 数

    为了充分了利用多 cpu 的优势,在 Golang 程序中,设置运行的 cpu 数目
    go1.8前需要设置一下运行的 cpu 数目,go1.8后默认让程序运行在多核上了,可以不用设置 ```go package main import ( “runtime” “fmt” )

func main() { //获取当前运行的CPU数目 cpuNum := runtime.NumCPU() fmt.Println(“cpuNum=”, cpuNum) //可以自己设置使用多个cpu runtime.GOMAXPROCS(cpuNum - 1) fmt.Println(“ok”) }

<a name="fcCuH"></a>
# channel(管道)
<a name="WK8VF"></a>
## 通过实现一个需求引出channel
> 现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到 map 中。最后显示出来。要求使用 goroutine 完成
> 思路分析:
> - 使用 goroutine 来完成,效率高,但是会出现并发/并行安全问题
> - 这里就提出了不同 goroutine 如何通信的问题

```go
package main
import (
    "fmt"
    _ "time"
    "sync"
)

// 需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到map中。
// 最后显示出来。要求使用goroutine完成 

// 思路
// 1. 编写一个函数,来计算各个数的阶乘,并放入到 map中.
// 2. 我们启动的协程多个,统计的将结果放入到 map中
// 3. map 应该做出一个全局的.

var (
    myMap = make(map[int]int, 10)  
    //声明一个全局的互斥锁
    //lock 是一个全局的互斥锁, 
    //sync 是包: synchornized 同步
    //Mutex : 是互斥
    lock sync.Mutex
)

// test 函数就是计算 n!, 让将这个结果放入到 myMap
func test(n int) {

    res := 1
    for i := 1; i <= n; i++ {
        res *= i
    }

    //这里我们将 res 放入到myMap
    //加锁
    lock.Lock()
    myMap[n] = res //concurrent map writes?
    //解锁
    lock.Unlock()
}

func main() {

    // 我们这里开启多个协程完成这个任务[200个]
    for i := 1; i <= 200; i++ {
        go test(i)
    }

    //等待所有 goroutine 全部完成的时间很难确定,这里休眠10秒钟,仅仅是估计
    //time.Sleep(time.Second * 5)

    //这里我们输出结果,变量这个结果
    lock.Lock()
    for i, v := range myMap {
        fmt.Printf("map[%d]=%d\n", i, v)
    }
    lock.Unlock()
}

小结:

  1. 高并发时对map并发写会产生竞争,产生代码错误 concurrent map writes
  2. 为解决这个问题需要加入全局变量的互斥锁
  3. 使用channel可以更优雅地解决这个问题

    为什么需要channel

  4. 前面使用全局变量加锁同步来解决 goroutine 的通讯,但不完美

  5. 主线程在等待所有 goroutine 全部完成的时间很难确定,我们这里设置 10 秒,仅仅是估算。
  6. 如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有 goroutine 处于工作状态,这时也会随主线程的退出而销毁
  7. 通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。
  8. 上面种种分析都在呼唤一个新的通讯机制-channel

    channel 的基本介绍

  9. channle 本质就是一个数据结构-队列

  10. 数据是先进先出【FIFO : first in first out】
  11. 线程安全,多 goroutine 访问时,不需要加锁,就是说 channel 本身就是线程安全的
  12. channel 有类型的,一个 string 的 channel 只能存放 string 类型数据。

goroutine和 channel - 图4

定义/声明 channel

var 变量名 chan 数据类型

var    intChan    chan int //intChan 用于存放 int 数据
var    mapChan chan map[int]string //mapChan 用于存放 map[int]string 类型
var    perChan    chan Person
var    perChan2 chan *Person
//说明
//channel 是引用类型
//channel 必须初始化才能写入数据, 即 make 后才能使用
//管道是有类型的,intChan 只能写入 整数 int

快速入门

package main
import (
    "fmt"
)

func main() {

    //演示一下管道的使用
    //1. 创建一个可以存放3个int类型的管道
    var intChan chan int
    intChan = make(chan int, 3)

    //2. 看看intChan是什么
    fmt.Printf("intChan 的值=%v intChan本身的地址=%p\n", intChan, &intChan)


    //3. 向管道写入数据
    intChan<- 10
    num := 211
    intChan<- num
    intChan<- 50
    // //如果从channel取出数据后,可以继续放入
    <-intChan
    intChan<- 98//注意点, 当我们给管写入数据时,不能超过其容量


    //4. 看看管道的长度和cap(容量)
    fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 3, 3

    //5. 从管道中读取数据

    var num2 int
    num2 = <-intChan 
    fmt.Println("num2=", num2)
    fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan))  // 2, 3

    //6. 在没有使用协程的情况下,如果我们的管道数据已经全部取出,再取就会报告 deadlock

    num3 := <-intChan
    num4 := <-intChan

    //num5 := <-intChan

    fmt.Println("num3=", num3, "num4=", num4/*, "num5=", num5*/)
}
intChan 的值=0xc000110080 intChan本身的地址=0xc000006028
channel len= 3 cap=3 
num2= 211
channel len= 2 cap=3
num3= 50 num4= 98

channel 使用的注意事项

  1. channel 中只能存放指定的数据类型
  2. channle 的数据放满后,就不能再放入了
  3. 如果从 channel 取出数据后,可以继续放入
  4. 在没有使用协程的情况下,如果 channel 数据取完了,再取,就会报 dead lock
  5. chan 数据类型为interface{}时,可以存放任意类型的数据,复杂数据类型从管道中取出时需要进行类型断言再使用 ```go type Cat struct { Name string Age int }

func main() {

//定义一个存放任意数据类型的管道 3个数据
//var allChan chan interface{}
allChan := make(chan interface{}, 3)

allChan<- 10
allChan<- "tom jack"
cat := Cat{"小花猫", 4}
allChan<- cat

//我们希望获得到管道中的第三个元素,则先将前2个推出
<-allChan
<-allChan

newCat := <-allChan //从管道中取出的Cat是什么?

fmt.Printf("newCat=%T , newCat=%v\n", newCat, newCat)
//下面的写法是错误的!编译不通过
// fmt.Printf("newCat.Name=%v", newCat.Name)
//使用类型断言
a := newCat.(Cat) 
fmt.Printf("newCat.Name=%v", a.Name)

}

<a name="nvUIf"></a>
## channel 的遍历和关闭
<a name="u1gtj"></a>
### channel 的关闭
使用内置函数`close` 可以关闭 channel, 当 channel 关闭后,就不能再向 channel 写数据了,但是仍然可以从该 channel 读取数据
```go
func main() {
    intChan := make(chan int, 3)
    intChan<- 100
    intChan<- 200
    close(intChan) // close
    //这时不能够再写入数到channel
    // intChan<- 300 //panic: send on closed channel
    fmt.Println("okook~")
    //当管道关闭后,读取数据是可以的
    n1 := <-intChan
    fmt.Println("n1=", n1)
}

channel 的遍历

channel 支持 for—range 的方式进行遍历,请注意两个细节

  1. 在遍历时,如果 channel 没有关闭,则会出现 deadlock 的错误
  2. 在遍历时,如果 channel 已经关闭,则会正常遍历数据,遍历完后,就会退出遍历

    func main() {
     //遍历管道
     intChan2 := make(chan int, 100)
     for i := 0; i < 100; i++ {
         intChan2<- i * 2  //放入100个数据到管道
     }
    
     //遍历管道不能使用普通的 for 循环
     // len := len(intChan2)
     // for i := 0; i < len; i++ {
     //     v := <-intChan2 //管道中的数据全部取出了
     //     fmt.Printf("v: %v\n", v)
     // }
     //在遍历时,如果channel没有关闭,则会出现deadlock的错误
     //在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历
     close(intChan2)
     fmt.Printf("len(intChan2): %v\n", len(intChan2))//len(intChan2): 100
     for v := range intChan2 {
         fmt.Println("v=", v)
     }
     fmt.Printf("len(intChan2): %v\n", len(intChan2))//len(intChan2): 0
    }
    

    应用实例

    请完成goroutine和channel协同工作的案例,具体要求:

    • 开启一个writeData协程,向管道intChan中写入50个整数
    • 开启一个readData协程,从管道intChan中读取writeData写入的数据
    • 注意:writeData和readDate操作的是同一个管道
    • 主线程需要等待writeData和readDate协程都完成工作才能退出
package main
import (
    "fmt"
    "time"
)


//write Data
func writeData(intChan chan int) {
    for i := 1; i <= 50; i++ {
        //放入数据
        intChan<- i //
        fmt.Println("writeData ", i)
        //time.Sleep(time.Second)
    }
    close(intChan) //关闭
}

//read data
func readData(intChan chan int, exitChan chan bool) {

    for {
        v, ok := <-intChan
        if !ok {
            break
        }
        time.Sleep(time.Second)
        fmt.Printf("readData 读到数据=%v\n", v) 
    }
    //readData 读取完数据后,即任务完成
    exitChan<- true
    close(exitChan)

}

func main() {

    //创建两个管道
    intChan := make(chan int, 10)
    exitChan := make(chan bool, 1)

    go writeData(intChan)
    go readData(intChan, exitChan)

    for {
        _, ok := <-exitChan //从exitChan中取不出数据,主线程就会阻塞在这里
        fmt.Printf("ok: %v\n", ok)
        if !ok {
            break
        }
    }

}

要求统计 1-200000 的数字中,哪些是素数 分析思路:

  • 传统的方法,就是使用一个循环,循环的判断各个数是不是素数
  • 使用并发/并行的方式,将统计素数的任务分配给多个(4 个)goroutine 去完成,完成任务时间短
package main

import (
    "fmt"
    "runtime"
)

//putNum 协程 放入数据
func putNum(intChan chan int) {
    for i := 1; i < 200000; i++ {
        intChan <- i
    }
    //数据放完后关闭协程
    close(intChan)
}

//primeNum 协程 取出num,如果是素数,放入primeChan
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
    var isPrime bool
    for {
        isPrime = true //假设是素数
        num, ok := <-intChan
        if !ok { //从intChan中取不出数据 退出循环
            break
        }
        for i := 2; i < num; i++ {
            if num%i == 0 {
                isPrime = false
                break
            }
        }
        if isPrime { //是素数,放入primeChan中
            primeChan <- num
        }
    }
    //工作完成后,向exitChan中写入数据
    exitChan <- true
}

func main() {
    cpuNum := runtime.NumCPU()          //获取当前运行cpu数,充分利用cpu性能
    intChan := make(chan int, 100)      //待处理数字暂存管道 buffer
    primeChan := make(chan int, 1000)   //存放素数 result
    exitChan := make(chan bool, cpuNum) //如果管道中有8个元素,程序退出

    go putNum(intChan)
    for i := 0; i < cpuNum; i++ {
        go primeNum(intChan, primeChan, exitChan)
    }

    go func() { //启动一个匿名协程,监视任务是否已经完成
        for i := 0; i < cpuNum; i++ {
            <-exitChan //exitChan取完数据后代表primeNum协程已经完成工作
        }
        //关闭primeChan
        close(primeChan)
    }()

    //打印输出结果
    for v := range primeChan {
        fmt.Printf("v: %v\n", v)
    }

}

channel 使用细节

  1. channel 可以声明为只读,或者只写性质

     //管道可以声明为只读或者只写
    
     //1. 在默认情况下下,管道是双向
     var chan1 chan int //可读可写
     fmt.Printf("chan1: %v\n", chan1)
    
     //2 声明为只写
     var chan2 chan<- int
     chan2 = make(chan int, 3)
     chan2<- 20
     // num := <-chan2 //error
    
     fmt.Println("chan2=", chan2)
    
     //3. 声明为只读
     var chan3 <-chan int
     num2 := <-chan3
     //chan3<- 30 //err
     fmt.Println("num2", num2)
    
  2. 使用 select 可以解决从管道取数据的阻塞问题 ```go package main import ( “fmt” “time” )

func main() {

//使用select可以解决从管道取数据的阻塞问题

//1.定义一个管道 10个数据int
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
    intChan<- i
}
//2.定义一个管道 5个数据string
stringChan := make(chan string, 5)
for i := 0; i < 5; i++ {
    stringChan <- "hello" + fmt.Sprintf("%d", i)
}

//传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock

//问题,在实际开发中,可能我们不好确定什么关闭该管道.
//可以使用select 方式可以解决
//label:
for {
    select {
        //注意: 这里,如果intChan一直没有关闭,不会一直阻塞而deadlock
        //,会自动到下一个case匹配
        case v := <-intChan : 
            fmt.Printf("从intChan读取的数据%d\n", v)
            time.Sleep(time.Second)
        case v := <-stringChan :
            fmt.Printf("从stringChan读取的数据%s\n", v)
            time.Sleep(time.Second)
        default :
            fmt.Printf("都取不到了,不玩了, 程序员可以加入逻辑\n")
            time.Sleep(time.Second)
            return 
            //break label
    }
}

}


3. goroutine 中使用 recover,解决协程中出现 panic,导致程序崩溃问题
```go
package main
import (
    "fmt"
    "time"
)

//函数
func sayHello() {
    for i := 0; i < 10; i++ {
        time.Sleep(time.Second)
        fmt.Println("hello,world")
    }
}
//函数
func test() {
    //这里我们可以使用defer + recover
    defer func() {
        //捕获test抛出的panic
        if err := recover(); err != nil {
            fmt.Println("test() 发生错误", err)
        }
    }()
    //定义了一个map
    var myMap map[int]string
    myMap[0] = "golang" //error
}

func main() {

    go sayHello()
    go test()


    for i := 0; i < 10; i++ {
        fmt.Println("main() ok=", i)
        time.Sleep(time.Second)
    }

}