概念

  1. [并发]
  2. :一个cpu同时执行多项任务,通过时间分片轮询 (或多核心调度)算法,选择执行任务,任务同时执行,就是并发
  3. [进程]
  4. cpu切换程序时,需要保存上一个程序的状态(context--上下文切换),就会丢失程序运行状态,引入进程概念
  5. :进程用以划分程序运行时所需要的资源,因此进程就是一个程序运行时候的所需要的基本资源单位 (运行实体)
  6. [并行]
  7. :多个cpu(多核心),每个cpu都有进程在运行(同一时刻,多个任务同时执行),这就是并行
  8. [用户态与内核态]
  9. :系统分为两种运行状态,用户态以及内核态
  10. [线程]
  11. :进程切换需要消耗大量资源,线程本身几乎不占有资源,他们共享进程里的资源,内核调度切换耗费资源
  12. [协程]
  13. :线程还是需要内核去进行调度,切换起来也是需要把用户态的数据写入到内核态,也是需要耗费一定的计算机资源
  14. :协程就是把自己的调度算法交给程序(用户态)去进行管理,能以更小的资源去进行并发。

多线程(multi-gopher)、协程

[goroutine]

    :Goroutine是建立在线程之上的轻量级的抽象,其以非常低的代价在同一个地址空间中并行地执行多个函数方法

  :相比于线程,其创建和销毁代价小很多,并且其调度是独立于线程的

  :Goroutine并不会比线程更快,只是增加了更多的并发性。

  :当一个goroutine被阻塞(比如等待IO),golang的scheduler会调度其它可以执行的goroutine运行。

[优点]

    :内存消耗更少,Goroutine所需要的内存通常只有2kb,而线程则需要1Mb

  :创建与销毁的开销更小,线程创建时需要向操作系统申请资源,并且在销毁时将资源归还

  :切换开销更小,线程的调度方式是抢占式的;goroutine的调度是协同式的, 不会直接地与操作系统内核打交道

[调度]

    :Processor(P)

  :OSThread(M

  :Goroutines(G)

  => 可用的线程数是通过GOMAXPROCS来设置,默认值是可用的CPU核数

  => OSThread 调度在processor上,goroutines调度在OSThreads上

[使用]

    :go func()

[阻塞主线程]

    :sync.WaitGroup 等待所有运行的goroutine运行结束后,再退出main函数 # 维护一个goroutine数量的计数器

    // 是Go语言标准库的一部分;用于等待一组goroutine结束运行
    :var n sync.WaitGroup

  :n.Add(1)     # 加一

  :defer n.Done # 减一

  :n.Wait()     # 判断是否等于 0 (阻塞)
[死锁]

    :
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
  // 维护一个goroutine数量的计数器
    var n sync.WaitGroup

    for i := 0; i < 20; i++ {
        n.Add(1)
        go func(i int, n *sync.WaitGroup) {
            defer n.Done()
            time.Sleep(1 * time.Second)
            fmt.Printf("goroutine %d is running\n", i)
        }(i, &n)
    }

  // wait方法会一直阻塞,知道计数器为0
    n.Wait()
}

image.png

image.png

Golang的调度器可以利用多processor资源,在任意时刻,M个goroutine需要被调度到N个OS threads上;

同时这些threads运行在至多GOMAXPROCS个processor上(N <= GOMAXPROCS)。

Go scheduler将可运行的goroutines分配到多个运行在一个或多个processor上的OS threads上。

每个processor有一个本地goroutine队列。同时有一个全局的goroutine队列。每个OSThread都会被分配给一个processor。最多只能有GOMAXPROCS个processor,每个processor同时只能执行一个OSThread。Scheculer可以根据需要创建OSThread。
image.png

在每一轮调度中,scheduler找到一个可以运行的goroutine并执行直到其被阻塞;操作系统的一个线程下可以并发执行上千个goroutine,每个goroutine所占用的资源和切换开销都很小,因此,goroutine是golang适合高并发场景的重要原因。

并发安全

[概念]

    :函数在线程程序,和并发情况下可以正确允许,那么这个函数是并发安全的。

    :一个特定类型的一些方法和操作函数,如果该类型是并发安全的,那么所有它的访问方法和操作都是并发安全的

[问题]

    :死锁 DeadLock

    :活锁 LiveLock

    :饿死 ResourceStarvation

[竞争条件]

    :指程序在多个 goroutine 交叉执行时,运行结果无法保持正确

    :数据竞争

数据竞争

[概念]

    :多个goroutine 并发访问同一个变量,且至少一个是写操作时会发生数据竞争

[消除数据竞争]

    :01 不去写变量,创建goroutine 之前初始化阶段,就初始化完成,其后都只是读取

    :02 避免多个goroutine访问变量,(变量的监控 monitor goroutine)
        => "不要使用共享内存来通信,使用通信来共享内存"

    :03 允许多个gorountine访问变量,但同一时刻只有一个gorountine能访问该变量,即互斥

        => 互斥锁 - sync.Mutex   (Lock() 获取锁、Unlock() 释放锁) ,该锁不能呢个重复获取
        => 读写锁 - sync.RWMutex (RLlock() 获取锁、RUnlock() 释放锁,写操作互斥,读操作并行)
        => 一次性初始化 - sync.Once
// "不要使用共享内存来通信,使用通信来共享内存"

var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance

func Deposit(amount int) { deposits <- amount }
func Balance() int       { return <-balances }

func teller() {
    // balance is confined to teller goroutine
    var balance int
    for {
        select {
        case amount := <-deposits:
            balance += amount
        case balances <- balance:
        }
    }
}

func init() {
    // start the monitor goroutine
    go teller()
}

通信机制 - 通信并发

channel 信道

[channel - 信道]

    :Channels允许线程间通信,goroutines 可以往里面发消息,也可以从中接收其它go routines 的消息

    :channel := make(chan int)

    :bufferedChan := make(chan string, 3) # 创建有缓冲信道

[使用]

    :var channel chan int = make(chan int)

[信道和取消息]

    :信道默认的存消息和取消息都是阻塞的 (无缓冲的信道)

    :无缓冲的信道在取消息和存消息的时候都会挂起当前的goroutine,除非另一端已经准备好
      => 从无缓冲信道取数据,必须要有数据流进来才可以,否则当前线阻塞
    => 数据流入无缓冲信道, 如果没有其他goroutine来拿走这个数据,那么当前线阻塞

[创建信道]

     :var channel chan int = make(chan int)

[读信道]

    :x := <-channel

    :fmt.Println(x)

[写信道]

     :y := 1

     :channel -< y

匿名Goroutines

// Anonymous go routine
go func() {
    fmt.Println("I'm running in my own go routine")
}()

image.png

非阻塞读

// select case 语句可以实现对 channel 的非阻塞读
// select 是语言级内置, 非阻塞

myChan := make(chan string)

go func(){
    myChan <- "Message!"
}()

select {
    case msg := <- myChan:
        fmt.Println(msg)
    default:
        fmt.Println("No Msg")
}
<-time.After(time.Second * 1)

select {
    case msg := <- myChan:
        fmt.Println(msg)
    default:
        fmt.Println("No Msg")
}

非阻塞写

// select case 语句可以实现对 channel 的非阻塞写
// 用于从一组可能的通讯中选择一个进一步处理

select {
    case myChan <- "message":
        fmt.Println("sent the message")
    default:
        fmt.Println("no message sent")
}

同步 - 共享内存并发

互斥锁 sync.Mutex

[同步锁]

    // 所有对共享数据的访问,不管读写,仅当goroutine持有锁才能操作
    :n := sync.Mutex // 互斥锁

    : n.Lock()

  :n.Unlock()

[原子操作子包 - 基础数据类型的原子操作函数]

    :sync.atomic

  :func CompareAndSwapUint64(val *uint64, old, new uint64) (swapped bool)

定义了一个函数使用并发来推迟触发一个事件

// 函数Publish在给定时间过期后打印text字符串到标准输出
// 该函数并不会阻塞而是立即返回
func Publish(text string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
    }()    // 注意这里的括号。必须调用匿名函数
}

func main() {
    Publish("A goroutine starts a new thread of execution.", 5*time.Second)
    fmt.Println("Let’s hope the news will published before I leave.")
    // 等待发布新闻
    time.Sleep(10 * time.Second)
    fmt.Println("Ten seconds later: I’m leaving now.")
}

不要通过共享内存来通讯,而是通过通讯来共享内存

func sharingIsCaring() {
    // 定义管道
    ch := make(chan int)
    go func() {
        // 仅为一个goroutine可见的局部变量
        n := 0
        n++
        // 数据从一个goroutine离开...
        ch <- n
    }()

    // ...然后安全到达另一个goroutine
    n := <-ch
    n++
    fmt.Println(n) // 输出: 2
}

并发数据结构,AtomicInt,用于存储一个整型值

// AtomicInt是一个并发数据结构, 持有一个整数值, 该数据结构的零值为0
type AtomicInt struct {
    // 锁,一次仅能被一个goroutine持有
    mu sync.Mutex
    n  int
}

// Add方法作为一个原子操作将n加到AtomicInt
func (a *AtomicInt) Add(n int) {
    a.mu.Lock() // 等待锁释放,然后持有它
    a.n += n
    a.mu.Unlock() // 释放锁
}

// Value方法返回a的值
func (a *AtomicInt) Value() int {
    a.mu.Lock()
    n := a.n
    a.mu.Unlock() // 整个结构被解锁了
    return n
}

func lockItUp() {
    wait := make(chan struct{})
    var n AtomicInt
    go func() {
        n.Add(1) // 一个访问
        close(wait)
    }()
    n.Add(1) // 另一个并发访问
    <-wait
    fmt.Println(n.Value()) // 输出: 2
}

读写锁 sync.RWMutex

[多读锁]

    :n := sync.RWMutex

  :n.RLock()

  :n.RUnlock()

sync.Once 初始化

[全局唯一操作 - 保证在全局范围内仅仅调用指定的函数一次]

    :sync.Once.Do()

  :var once sync.Once

  :once.Do(func())

数据竞争自动检测

$ go run -race raceClosure.go
Race:
==================
WARNING: DATA RACE
Read by goroutine 2:
    main.func·001()
      ../raceClosure.go:22 +0x65

Previous write by goroutine 0:
    main.race()
        ../raceClosure.go:20 +0x19b
    main.main()
        ../raceClosure.go:10 +0x29
    runtime.main()
        ../go/src/pkg/runtime/proc.c:248 +0x91

Goroutine 2 (running) created at:
    main.race()
      ../raceClosure.go:24 +0x18b
    main.main()
      ../raceClosure.go:10 +0x29
     runtime.main()
      ../go/src/pkg/runtime/proc.c:248 +0x91

==================
55555
Correct:
01234
Also correct:
01324
Found 1 data race(s)
exit status 66

// 该工具发现一处数据竞争,包含:一个goroutine在第20行对一个变量进行写操作
// 跟着另一个goroutine在第22行对同一个变量进行了未同步的读操作。

并行计算


func init() {
    numCpu := runtime.NumCPU()
    // 尝试使用所有可用的CPU
    runtime.GOMAXPROCS(numCpu)
}

type Vector []float64

// 函数mul 返回 Σ u[i]*v[j], i + j = k.
func mul(u, v Vector, k int) (res float64) {
    n := min(k+1, len(u))
    j := min(k, len(v)-1)
    for i := k - j; i < n; i, j = i+1, j-1 {
        res += u[i] * v[j]
    }
    return
}

// 思路很简单:确定合适大小的工作单元,然后在不同的goroutine中执行每个工作单元
// 函数Convolve 计算 w = u * v,其中 w[k] = Σ u[i]*v[j], i + j = k
// 先决条件:len(u) > 0, len(v) > 0
func Convolve(u, v Vector) (w Vector) {
    n := len(u) + len(v) - 1
    w = make(Vector, n)

    for k := 0; k < n; k++ {
        w[k] = mul(u, v, k)
    }
    return
}

// 并发版本
func ConvolveGo(u, v Vector) (w Vector) {
    n := len(u) + len(v) - 1
    w = make(Vector, n)

    // 将 w 切分成花费 ~100μs-1ms 用于计算的工作单元
    size := max(1, 1<<20/n)

    wg := new(sync.WaitGroup)
    wg.Add(1 + (n-1)/size)
    for i := 0; i < n && i >= 0; i += size { // 整型溢出后 i < 0
        j := i + size
        if j > n || j < 0 { // 整型溢出后 j < 0
            j = n
        }

        // 这些goroutine共享内存,但是只读
        go func(i, j int) {
            for k := i; k < j; k++ {
                w[k] = mul(u, v, k)
            }
            wg.Done()
        }(i, j)
    }
    wg.Wait()
    return
}

// 业务计算
func max(a, b int) int {
    return 1;
}

// 业务计算
func min(a, b int) int {
    return b;
}