这可能是最容易理解的 Go Mutex 源码剖析

6-1 channel

代码案例

code1 基本chan收发

  1. package main
  2. import "fmt"
  3. func chanDemo() {
  4. //var c chan int // c == nil
  5. c := make(chan int)
  6. c <- 1
  7. c <- 2
  8. c <- 3
  9. n:= <-c
  10. fmt.Println(n)
  11. }
  12. func main() {
  13. chanDemo()
  14. }
  15. /**
  16. fatal error: all goroutines are asleep - deadlock!
  17. goroutine 1 [chan send]:
  18. main.chanDemo()
  19. E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/channel.go:8 +0x37
  20. main.main()
  21. */

死锁了

发的数据没人收是会deadlock的
1635301183810

code2 加上等待,避免主线程停止而导致其他chan都被干掉

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func chanDemo() {
  7. //var c chan int // c == nil
  8. c := make(chan int)
  9. go func() {
  10. for {
  11. n:= <- c
  12. fmt.Println(n)
  13. }
  14. }()
  15. c <- 1
  16. c <- 2
  17. //c <- 3
  18. //n := <-c
  19. //fmt.Println(n)
  20. time.Sleep(time.Millisecond)
  21. }
  22. func main() {
  23. chanDemo()
  24. }
  25. /**
  26. 1
  27. 2
  28. Process finished with the exit code 0
  29. */

go语言函数是一等公民

go语言中的channel 也是一等公民

code3 worker提取出来

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func worker(c chan int) {
  7. for {
  8. n := <-c
  9. fmt.Println(n)
  10. }
  11. }
  12. func chanDemo() {
  13. //var c chan int // c == nil
  14. c := make(chan int)
  15. go worker(c)
  16. c <- 1
  17. c <- 2
  18. //c <- 3
  19. //n := <-c
  20. //fmt.Println(n)
  21. time.Sleep(time.Millisecond)
  22. }
  23. func main() {
  24. chanDemo()
  25. }
  26. /**
  27. 1
  28. 2
  29. Process finished with the exit code 0
  30. */

code4 chan可以通过外部参数传递进来

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func worker(id int,c chan int) {
  7. for {
  8. fmt.Println(id,<-c)
  9. }
  10. }
  11. func chanDemo() {
  12. var cahnneles [10]chan int
  13. for i := 0; i < 10; i++ {
  14. //var c chan int // c == nil
  15. cahnneles[i] = make(chan int)
  16. go worker(i,cahnneles[i])
  17. }
  18. for i := 0; i < 10; i++ {
  19. cahnneles[i] <- 'a' + i
  20. }
  21. time.Sleep(time.Millisecond)
  22. }
  23. func main() {
  24. chanDemo()
  25. }
  26. /**
  27. 5 102
  28. 1 98
  29. 2 99
  30. 3 100
  31. 4 101
  32. 0 97
  33. 6 103
  34. 7 104
  35. 9 106
  36. 8 105
  37. Process finished with the exit code 0
  38. */

在打印

code5 格式化一下

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func worker(id int, c chan int) {
  7. for {
  8. fmt.Printf("%d,%c\n", id, <-c)
  9. }
  10. }
  11. func chanDemo() {
  12. var cahnneles [10]chan int
  13. for i := 0; i < 10; i++ {
  14. //var c chan int // c == nil
  15. cahnneles[i] = make(chan int)
  16. go worker(i, cahnneles[i])
  17. }
  18. for i := 0; i < 10; i++ {
  19. cahnneles[i] <- 'a' + i
  20. }
  21. for i := 0; i < 10; i++ {
  22. cahnneles[i] <- 'A' + i
  23. }
  24. time.Sleep(time.Millisecond)
  25. }
  26. func main() {
  27. chanDemo()
  28. }
  29. /**
  30. 5,f
  31. 2,c
  32. 0,a
  33. 1,b
  34. 3,d
  35. 0,A
  36. 1,B
  37. 9,j
  38. 8,i
  39. 4,e
  40. 4,E
  41. 3,D
  42. 7,h
  43. 2,C
  44. 6,g
  45. 6,G
  46. 7,H
  47. 9,J
  48. 5,F
  49. 8,I
  50. Process finished with the exit code 0
  51. */

goroutine 调度之后,先发的不一定会先收到

code6 并发从chan读取

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func createWorker(id int) chan int {
  7. c := make(chan int)
  8. go func() {
  9. for {
  10. fmt.Printf("@%d---%c\n", id, <-c)
  11. }
  12. }()
  13. return c
  14. }
  15. func chanDemo() {
  16. var cahnneles [10]chan int
  17. for i := 0; i < 10; i++ {
  18. //var c chan int // c == nil
  19. cahnneles[i] = createWorker(i)
  20. }
  21. for i := 0; i < 10; i++ {
  22. cahnneles[i] <- 'a' + i
  23. }
  24. for i := 0; i < 10; i++ {
  25. cahnneles[i] <- 'A' + i
  26. }
  27. time.Sleep(time.Millisecond)
  28. }
  29. func main() {
  30. chanDemo()
  31. }
  32. /**
  33. @0---a
  34. @8---i
  35. @1---b
  36. @1---B
  37. @3---d
  38. @4---e
  39. @5---f
  40. @6---g
  41. @7---h
  42. @0---A
  43. @9---j
  44. @2---c
  45. @2---C
  46. @5---F
  47. @6---G
  48. @7---H
  49. @3---D
  50. @4---E
  51. @9---J
  52. @8---I
  53. Process finished with the exit code 0
  54. */

// // 告诉外面用的人 , 我这个channel怎么用

1635302653609
这样就不能收数据了,只能发 属性 (send-only type)

//n:= <-cahnneles[i] 
// ↑ Invalid operation: <-cahnneles[i] (receive from the send-only type chan<- int)

code7 chan<- int 只能够收/发 的类型 实现

package main

import (
    "fmt"
    "time"
)

// // 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan<- int { // 告诉外面用的人 , 我这个channel怎么用
    //
    c := make(chan int)
    go func() {
        for {
            fmt.Printf("@%d---%c\n", id, <-c)
        }
    }()
    return c
}

func chanDemo() {

    var cahnneles [10]chan<- int
    for i := 0; i < 10; i++ {
        //var c chan int // c == nil
        cahnneles[i] = createWorker(i)
        //n:= <-cahnneles[i]
        // ↑ Invalid operation: <-cahnneles[i] (receive from the send-only type chan<- int)
    }

    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'A' + i
    }
    time.Sleep(time.Millisecond)

}

func bufferedChannel() {
    c := make(chan int)
    c <- 1
}

func main() {
    //chanDemo()
    bufferedChannel()
}

/**
goroutine 1 [chan send]:
main.bufferedChannel(...)
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/channel.go:42
main.main()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/channel.go:47 +0x31

Process finished with the exit code 2

 */

这样就是到4个 追加的时候才报错

code8 加上缓冲区(指定长度)

func bufferedChannel() {
    // 加上缓冲区,大小为3
    c := make(chan int,3)
    c <- 1
    c <- 2
    c <- 3
    c <- 4

}

code9 chan 不用同步收发了就

package main

import (
    "fmt"
    "time"
)

func worker(id int,c chan int) {
    for {
        fmt.Printf("@%d---%c\n", id, <-c)
    }
}


func bufferedChannel() {
    // 加上缓冲区,大小为3
    c := make(chan int,3)
    go worker(0,c)
    c <- '1'
    c <- '2'
    c <- '3'
    //c <- 4
    time.Sleep(time.Millisecond)



}

func main() {
    //chanDemo()
    bufferedChannel()
}

/**
@0---1
@0---2
@0---3

Process finished with the exit code 0

 */

告诉接收方,何时发完了

code10 能够通过 err判断 chan是否 没消息了

package main

import (
    "fmt"
    "time"
)

func worker(id int, c chan int) {
    for {
        n,ok := <-c
        if ok{
            fmt.Printf("@%d---%d\n", id,n)
        }else{
            break
        }
    }
}


func bufferedChannel() {
    // 加上缓冲区,大小为3
    c := make(chan int)
    go worker(0, c)
    c <- '1'
    c <- '2'
    c <- '3'
    c <- 'd'
    //c <- 4
    close(c)
    time.Sleep(time.Millisecond)

}

func main() {
    //chanDemo()
    bufferedChannel()
}

/**
@0---49
@0---50
@0---51
@0---100

Process finished with the exit code 0

*/

code11 chan的for in 遍历

func worker(id int, c chan int) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
    }
}
// 这样也可以

理论channel

  • channel
  • buffered channel
  • range
  • 理论基础:Communication Sequential Process (CSP)

    go语言的创作者说

go语言的创作者说:

Don’t communicate by sharing memory; sharememory by communicating.
不要通过共享内存来通讯;通过通信来共享内存

6-2 使用Channel等待任务结束

通过通信来共享内存

code01 顺序打印待修复

目前是按照顺序打印的
package main

import (
    "fmt"
    "time"
)

func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        done <- true
    }
}

type worker struct {
    in   chan int
    done chan bool
}

// // 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i)
    }

    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        res := <-workers[i].done
        print(res)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        res := <-workers[i].done
        print(res)
    }
    time.Sleep(time.Second)

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
    //bufferedChannel()
}

/**
Channel as first-class citizen
@0---97
@1---98
@2---99
@3---100
@4---101
@5---102
@6---103
@7---104
@8---105
@9---106
@0---65
@1---66
@2---67
@3---68
@4---69
@5---70
@6---71
@7---72
@8---73
@9---74
truetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetrue
Process finished with the exit code 0

*/

code02 报错了

package main

import (
    "fmt"
    "time"
)

func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        done <- true
    }
}

type worker struct {
    in   chan int
    done chan bool
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i)
    }

    for i, w := range workers {
        w.in <- 'a' + i
        //res := <-workers[i].done
        //print(res)
    }
    for i, w := range workers {
        w.in <- 'A' + i
        //res := <-workers[i].done
        //print(res)
    }

    // wait for all of them
    for _, w := range workers {
        res := <-w.done
        res2 := <-w.done
        print(res)
        print(res2)
    }
    time.Sleep(time.Second)

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
}

/**
Channel as first-class citizen
@0---97
@4---101
@2---99
@3---100
@9---106
@5---102
@6---103
@8---105
@7---104
@1---98
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.chanDemo()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:45 +0x15d
main.main()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:63 +0x57

goroutine 6 [chan send]:



*/

大写字母那部分,发完了,没有来接收的,发会阻塞住

因为 channel 没有长度

code03 等待的也 开goroutine

func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        go func() {
            done <- true
        }()
    }
}
//这样就可以了

这是因为我们需要等2次,如果我们只是等一次就不用的

code04 waitgroup方式来进行判断

package main

import (
    "fmt"
    "sync"
    //"time"
)

func doWork(
    id int, c chan int, wg *sync.WaitGroup,
) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        wg.Done()
    }
}

type worker struct {
    in chan int
    wg *sync.WaitGroup
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int, wg *sync.WaitGroup) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {

    var wg sync.WaitGroup

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i, &wg)
    }
    wg.Add(20)

    for i, w := range workers {
        w.in <- 'a' + i
        //wg.Add(1)

    }
    for i, w := range workers {
        w.in <- 'A' + i
    }

    // wait for all of themtime.Sleep(time.Second)
    wg.Wait()

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
}

/**
Channel as first-class citizen
@0---97
@4---101
@2---99
@3---100
@9---106
@5---102
@6---103
@8---105
@7---104
@1---98
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.chanDemo()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:45 +0x15d
main.main()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:63 +0x57

goroutine 6 [chan send]:


*/

code05 函数是一等公民之再次改进

package main

import (
    "fmt"
    "sync"
    //"time"
)

func doWork(id int, w worker) {
    for n := range w.in {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        w.done()
    }
}

type worker struct {
    in   chan int
    done func()
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int, wg *sync.WaitGroup) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in: make(chan int),
        // 函数式编程 
        // 匿名函数来赋值
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w)
    return w
}

func chanDemo() {

    var wg sync.WaitGroup

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i, &wg)
    }
    wg.Add(20)

    for i, w := range workers {
        w.in <- 'a' + i
        //wg.Add(1)

    }
    for i, w := range workers {
        w.in <- 'A' + i
    }

    // wait for all of themtime.Sleep(time.Second)
    wg.Wait()

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
}

/**
Channel as first-class citizen
@0---97
@1---98
@2---99
@2---67
@3---100
@3---68
@4---101
@4---69
@5---102
@5---70
@6---103
@6---71
@7---104
@7---72
@8---105
@8---73
@9---106
@9---74
@0---65
@1---66

Process finished with the exit code 0


*/

6-3 使用Channel进行树的遍历

code01

package tree

import "fmt"

type Node struct {
    Value       int
    Left, Right *Node
}

func (node Node) Print() {
    fmt.Print(node.Value, " ")
}

func (node *Node) SetValue(value int) {
    if node == nil {
        fmt.Println("Setting Value to nil " +
            "node. Ignored.")
        return
    }
    node.Value = value
}

func CreateNode(value int) *Node {
    return &Node{Value: value}
}
package tree

import "fmt"

func (node *Node) Traverse() {
    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }

    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}

func (node *Node) TraverseWithChannel() chan *Node {
    out := make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out)
    }()
    return out
}

6-4 Select

使用select 进行调度

code1 非阻塞式的处理

package main

import "fmt"

func main() {
    var c1, c2 chan int
    select {
    case n := <-c1:
        fmt.Println("received from c1:", n)
    case n := <-c2:
        fmt.Println("received from c2:", n)
    default:
        fmt.Println("no val received")
    }

}
/**
no val received

Process finished with the exit code 0
非阻塞式的处理
 */

code02 死循环

func main() {
    var c1, c2 chan int
    for {
        select {
        case n := <-c1:
            fmt.Println("received from c1:", n)
        case n := <-c2:
            fmt.Println("received from c2:", n)
        default:
            fmt.Println("no val received")
        }
    }

}

code03 改进

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("received from c1:", n)
        case n := <-c2:
            fmt.Println("received from c2:", n)
        //default:
        //    fmt.Println("no val received")
        }
    }

}

/**
received from c1: 0
received from c2: 0
received from c2: 1
received from c1: 1
received from c1: 2
received from c2: 2
received from c1: 3
received from c2: 3
received from c2: 4
received from c2: 5
received from c1: 4
received from c1: 5
received from c1: 6
received from c2: 6
received from c1: 7
received from c2: 7
received from c1: 8

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

*/

code04 temp 改进中

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for {
        n, ok := <-c
        if ok {
            fmt.Printf("@%d---%d\n", id, n)
        } else {
            break
        }
    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    hasValue := false
    for {

        var activeWorker chan<- int
        if hasValue {
            activeWorker = work
        }
        select {
        case n = <-c1:
            hasValue = true
        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
@0---3

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

 */

code05 sleep长一点就会出问题

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(5 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    hasValue := false
    for {

        var activeWorker chan<- int
        if hasValue {
            activeWorker = work
        }
        select {
        case n = <-c1:
            hasValue = true
        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }
    }

}

/**
@0---0
@0---7
@0---12
@0---18


*/

新的数据会把之前的数据冲掉

code06 吧收到的n,存下来排队

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
@0---3
@0---3


*/

code07 让程序 10秒钟后结束

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    tm := time.After(10 * time.Second)
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-tm:
            fmt.Println("bye")
            return
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
@0---3
@0---3
@0---4
bye

Process finished with the exit code 0

*/

code08 超时的情况

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    tm := time.After(10 * time.Second)
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-time.After(800 * time.Millisecond):
            // 如果超过 800 毫秒 之内没有生成数据
            fmt.Println("timeout")
        case <-tm:
            fmt.Println("bye")
            return
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
timeout
@0---3
timeout
@0---3
timeout
@0---4
bye

Process finished with the exit code 0


*/

code09 定时的加入 每秒显示长度

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    tm := time.After(10 * time.Second)
    tick := time.Tick(time.Second)
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-time.After(800 * time.Millisecond):
            // 如果超过 800 毫秒 之内没有生成数据
            fmt.Println("timeout")
        case <-tick:
            fmt.Println("queue len is", len(values))
        case <-tm:
            fmt.Println("bye")
            return
        }
    }

}

/**
queue len is 3
@0---0
queue len is 4
@0---0
queue len is 5
@0---1
queue len is 10
@0---1
queue len is 10
@0---2
queue len is 11
@0---2
queue len is 12
@0---3
queue len is 13
@0---3
queue len is 14
@0---4
queue len is 15
bye

Process finished with the exit code 0

*/

6-5 传统同步机制

传统同步机制

  • WaitGroup
  • Mutex
  • Cond

    code0 init atomic

    ``` package main

import ( “fmt” “time” )

type atomicInt int

func (a atomicInt) increment() { a++ } func (a atomicInt) get() int { return int(a) }

func main() { var a atomicInt a.increment() go func() { a.increment() }() time.Sleep(time.Millisecond) fmt.Println(a) } /** 2

Process finished with the exit code 0 */

```
PS E:\Projects\GolandProjects\go-camp\mooc\code\learngo\basic\atomic> go run -race .\atomic.go
==================
WARNING: DATA RACE
Read at 0x00c00000e0d8 by main goroutine:
  main.main()
      E:/Projects/GolandProjects/go-camp/mooc/code/learngo/basic/atomic/atomic.go:24 +0xee

Previous write at 0x00c00000e0d8 by goroutine 7:
  main.(*atomicInt).increment()
      E:/Projects/GolandProjects/go-camp/mooc/code/learngo/basic/atomic/atomic.go:11 +0x45
  main.main.func1()
      E:/Projects/GolandProjects/go-camp/mooc/code/learngo/basic/atomic/atomic.go:21 +0x2e

Goroutine 7 (finished) created at:
  main.main()

data race 存在

code01 加上锁的

package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock  sync.Mutex
}

func (a *atomicInt) increment() {
    a.lock.Lock()

    defer a.lock.Unlock()
    a.value++
}
func (a *atomicInt) get() int {
    a.lock.Lock()

    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}

/**
2

Process finished with the exit code 0
*/
PS E:\Projects\GolandProjects\go-camp\mooc\code\learngo\basic\atomic> go run -race .\atomic.go
2
PS E:\Projects\GolandProjects\go-camp\mooc\code\learngo\basic\atomic>

这回没有data race了

code 02 部分加锁 (匿名函数)

func (a *atomicInt) increment() {
    fmt.Println("safe increment")
    func() {
        a.lock.Lock()

        defer a.lock.Unlock()
        a.value++
    }()
}
safe increment
safe increment
2

Process finished with the exit code 0

协程Coroutine

  • 轻量级“线程”
  • 非抢占式多任务处理,由协程主动交出控制权
  • 编译器/解释器/虚拟机层面的多任务
  • 多个协程可能在一个或多个线程上运行

    这可能是最容易理解的 Go Mutex 源码剖析

    6-6 并发模式(上)

    code00 init

    ``` package main

import ( “fmt” “math/rand” “time” )

// chan 是一等公民 func msgGen() chan string { c := make(chan string) go func() { i := 0 for { time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) // sprintf 作为字符串打印 c <- fmt.Sprintf(“message %d”, i) i++ } }() return c } func main() { // 生成消息 m := msgGen() for { fmt.Println(<-m)

}

}

/** message 0 message 1 message 2 message 3 message 4 message 5 message 6

*/

### code01 加上方向

package main

import ( “fmt” “math/rand” “time” )

// chan 是一等公民 func msgGen() <-chan string { c := make(chan string) go func() { i := 0 for { time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) // sprintf 作为字符串打印 c <- fmt.Sprintf(“message %d”, i) i++ } }() return c } func main() { // 生成消息 m := msgGen() for { fmt.Println(<-m) //m<- “abc” // 没有办法发数据 }

}

/** message 0 message 1 message 2 message 3 message 4 message 5 message 6

*/

### code 02 开两个 msgGen

package main

import ( “fmt” “math/rand” “time” )

// chan 是一等公民 func msgGen() <-chan string { c := make(chan string) go func() { i := 0 for { time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) // sprintf 作为字符串打印 c <- fmt.Sprintf(“message %d”, i) i++ } }() return c } func main() { // 生成消息 m1 := msgGen() m2 := msgGen() for { fmt.Println(<-m1) fmt.Println(<-m2) //m<- “abc” // 没有办法发数据 }

}

/** message 0 message 0 message 1 message 1 message 2 message 2 message 3 message 3 message 4 …

*/

### code03 加上参数

package main

import ( “fmt” “math/rand” “time” )

// chan 是一等公民 func msgGen(name string) <-chan string { c := make(chan string) go func() { i := 0 for { time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) // sprintf 作为字符串打印 c <- fmt.Sprintf(“service %s: message %d”, name, i) i++ } }() return c } func main() { // 生成消息 m1 := msgGen(“服务A”) m2 := msgGen(“服务B”) for { fmt.Println(<-m1) fmt.Println(<-m2) //m<- “abc” // 没有办法发数据 }

}

/* service 服务A: message 0 service 服务B: message 0 service 服务A: message 1 service 服务B: message 1 service 服务A: message 2 service 服务B: message 2 service 服务A: message 3 service 服务B: message 3 service 服务A: message 4 /


> 上面是交替的等待,显然是不对的


### code 04 让他们交替

package main

import ( “fmt” “math/rand” “time” )

// chan 是一等公民 func msgGen(name string) <-chan string { c := make(chan string) go func() { i := 0 for { time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) // sprintf 作为字符串打印 c <- fmt.Sprintf(“service %s: message %d”, name, i) i++ } }() return c }

func fanIn(c1, c2 <-chan string) chan string { c := make(chan string) go func() { // todo c1,c2 如何调度 for { c <- <-c1 } }() go func() { for { c <- <-c2 } }() return c } func main() { // 生成消息 m1 := msgGen(“服务A”) m2 := msgGen(“服务B”) m := fanIn(m1, m2)

for {
    fmt.Println(<-m)
    //m<- "abc"
    // 没有办法发数据
}

}

/** service 服务A: message 0 service 服务B: message 0 service 服务B: message 1 service 服务A: message 1 service 服务B: message 2 service 服务B: message 3 service 服务B: message 4 service 服务A: message 2 service 服务B: message 5 service 服务B: message 6 service 服务A: message 3 service 服务B: message 7 service 服务A: message 4 service 服务A: message 5 service 服务B: message 8

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

*/

### code 05 select 方式

func fanInBySelect(c1, c2 <-chan string) chan string { c := make(chan string) go func() { for { select { case m := <-c1: c <- m case m := <-c2: c <- m } } }() return c }


> 比较 非select 和 select


## 6-7 并发模式(下)
### code00 出现问题 goroutine循环变量的坑

package main

import ( “fmt” “math/rand” “time” )

// chan 是一等公民 func msgGen(name string) <-chan string { c := make(chan string) go func() { i := 0 for { time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) // sprintf 作为字符串打印 c <- fmt.Sprintf(“service %s: message %d”, name, i) i++ } }() return c }

func fanIn(chs …<-chan string) chan string { c := make(chan string) for _, ch := range chs { go func() { // todo c1,c2 如何调度 for { c <- <-ch } }() }

return c

}

func fanInBySelect(c1, c2 <-chan string) chan string { c := make(chan string) go func() { for { select { case m := <-c1: c <- m case m := <-c2: c <- m } } }() return c }

func main() { // 生成消息 m1 := msgGen(“服务A”) m2 := msgGen(“服务B”) m3 := msgGen(“服务C”) m := fanIn(m1, m2, m3)

for {
    fmt.Println(<-m)
    //m<- "abc"
    // 没有办法发数据
}

}

/** service 服务C: message 0 service 服务C: message 1 service 服务C: message 2 service 服务C: message 3 service 服务C: message 4 service 服务C: message 5 service 服务C: message 6

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

*/


> go语言forin循环的一个bug,导致只能拿到最后一次的

> go 开routine 的时候不会拿值,等到 调用的时候才会拿值


### code01 解决方式**self**
使用temp

func fanIn(chs …<-chan string) chan string { c := make(chan string) var chtemp <-chan string for , ch := range chs { ch_temp = ch go func() { // todo c1,c2 如何调度 for { c <- <-ch_temp } }() }

return c

}

或者这样

func fanIn(chs …<-chan string) chan string { c := make(chan string) for _, ch := range chs { chTemp := ch go func() { // todo c1,c2 如何调度 for { c <- <-chTemp } }() } return c }

终极解决,因为go语言全是值传递<br />所以

func fanIn(chs …<-chan string) chan string { c := make(chan string) for _, ch := range chs { //chTemp := ch go func(in <-chan string) { // todo c1,c2 如何调度 for { c <- <-in } }(ch) } return c }

![1635323116816](https://cdn.nlark.com/yuque/0/2021/png/2460262/1635351767079-dc425ee7-96ea-4402-99f6-c241bbc9ef20.png)
## 6-8 并发任务的控制
### code00  select 等待 (非阻塞等待)

func nonBlockingWait(c <-chan string) (string, bool) { select { case m := <-c: return m, true default: return “”, false } }

func main() { // 生成消息 m1 := msgGen(“服务A”) m2 := msgGen(“服务B”)

for {
    fmt.Println(<-m1)
    if m, ok := nonBlockingWait(m2); ok {
        fmt.Println(m)
    }else {
        fmt.Println("no message from serve")
    }
}

}

/* service 服务A: message 0 service 服务B: message 0 service 服务A: message 1 service 服务B: message 1 service 服务A: message 2 no message from serve service 服务A: message 3 no message from serve service 服务A: message 4 no message from serve service 服务A: message 5 service 服务B: message 2 /


---

### code01 timeout 支持设置等待时间(超时机制)

func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) { select { case m := <-c: return m, true case <-time.After(timeout): // 没有等到 return “”, false } }

func main() { // 生成消息 m1 := msgGen(“服务A”) m2 := msgGen(“服务B”)

for {
    fmt.Println(<-m1)
    if m, ok := timeoutWait(m2,2*time.Second); ok {
        fmt.Println(m)
    } else {
        fmt.Println("no message from serve")
    }
}

}

/** service 服务A: message 0 service 服务B: message 0 service 服务A: message 1 service 服务B: message 1 service 服务A: message 2 service 服务B: message 2 service 服务A: message 3 service 服务B: message 3 service 服务A: message 4

*/


> 当 等待时间小于任务间隔时间 ,可能会出现 等待 (走到else {no message from serve")分支

func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) { select { case m := <-c: return m, true case <-time.After(timeout): // 没有等到 return “”, false } }

func main() { // 生成消息 m1 := msgGen(“服务A”) m2 := msgGen(“服务B”)

for {
    fmt.Println(<-m1)
    if m, ok := timeoutWait(m2,1*time.Second); ok {
        fmt.Println(m)
    } else {
        fmt.Println("no message from serve")
    }
}

}

/** service 服务A: message 0 service 服务B: message 0 service 服务A: message 1 service 服务B: message 1 service 服务A: message 2 no message from serve service 服务A: message 3 service 服务B: message 2

*/

### code02 处理中断

package main

import ( “fmt” “math/rand” “time” )

// chan 是一等公民 func msgGen(name string) <-chan string { c := make(chan string) go func() { i := 0 for { time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000))) // sprintf 作为字符串打印 c <- fmt.Sprintf(“service %s: message %d”, name, i) i++ } }() return c }

func fanIn(chs …<-chan string) chan string { c := make(chan string) for _, ch := range chs { //chTemp := ch go func(in <-chan string) { // todo c1,c2 如何调度 for { c <- <-in } }(ch) } return c }

func fanInBySelect(c1, c2 <-chan string) chan string { c := make(chan string) go func() { for { select { case m := <-c1: c <- m case m := <-c2: c <- m } } }() return c }

func nonBlockingWait(c <-chan string) (string, bool) { select { case m := <-c: return m, true default: return “”, false } }

func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) { select { case m := <-c: return m, true case <-time.After(timeout): // 没有等到 return “”, false } }

func main() { // 生成消息 m1 := msgGen(“服务A”) m2 := msgGen(“服务B”)

for i:=0;i<5; i++{
    fmt.Println(<-m1)
    if m, ok := timeoutWait(m2,1*time.Second); ok {
        fmt.Println(m)
    } else {
        fmt.Println("no message from serve")
    }
}

}

/** service 服务A: message 0 service 服务B: message 0 service 服务A: message 1 no message from serve service 服务A: message 2 service 服务B: message 1 service 服务A: message 3 no message from serve service 服务A: message 4 service 服务B: message 2

Process finished with the exit code 0

*/


> 考虑时间场景中: 我的等待代码部分,是正在做事情,而我不希望 异常中断



> 准备做成: 能够通知我 退出 信号

// chan bool 或者 chan struct{} // 这个 chan struct{} 里面没有任何的数据,他比bool更加的省空间 func msgGen(name string, done chan struct{}) <-chan string { c := make(chan string) go func() { i := 0 for { // 判断 done 是否在 select { case <-time.After(time.Millisecond * time.Duration(rand.Intn(5000))): c <- fmt.Sprintf(“service %s: message %d”, name, i) case <-done: // 证明我是主动的退出 fmt.Println(“cleaning up”) return } i++ } }() return c }

主函数

func main() {

done :=make(chan struct{})
// 生成消息
m1 := msgGen("服务A",done)
m2 := msgGen("服务B",done)

for i := 0; i < 5; i++ {
    fmt.Println(<-m1)
    if m, ok := timeoutWait(m2, 1*time.Second); ok {
        fmt.Println(m)
    } else {
        fmt.Println("no message from serve")
        //time.Sleep(2*time.Second)
    }
}
done<- struct{}{}
// 送完停止信号,为了让看到结果,再等他个2s钟
time.Sleep(2*time.Second)

}

/** service 服务A: message 0 service 服务B: message 0 service 服务A: message 1 no message from serve service 服务A: message 2 service 服务B: message 1 service 服务A: message 3 no message from serve service 服务A: message 4 service 服务B: message 2 cleaning up

Process finished with the exit code 0

*/


> 大部分的chan都是单向的

> 这个done可以做成双向的,可以在清理完后,再通知主函数可以正常退出了


### code03 优雅的退出
服务器的优雅退出

package main

import ( “fmt” “math/rand” “time” )

// chan bool 或者 chan struct{} // 这个 chan struct{} 里面没有任何的数据,他比bool更加的省空间 func msgGen(name string, done chan struct{}) <-chan string { c := make(chan string) go func() { i := 0 for { // 判断 done 是否在 select { case <-time.After(time.Millisecond time.Duration(rand.Intn(5000))): c <- fmt.Sprintf(“service %s: message %d”, name, i) case <-done: // 证明我是主动的退出 fmt.Println(“cleaning up”) time.Sleep(2time.Second) fmt.Println(“cleaning over”) //done <- “cleaning over” done<- struct{}{} return } i++ } }() return c }

func fanIn(chs …<-chan string) chan string { c := make(chan string) for _, ch := range chs { //chTemp := ch go func(in <-chan string) { // todo c1,c2 如何调度 for { c <- <-in } }(ch) } return c }

func fanInBySelect(c1, c2 <-chan string) chan string { c := make(chan string) go func() { for { select { case m := <-c1: c <- m case m := <-c2: c <- m } } }() return c }

func nonBlockingWait(c <-chan string) (string, bool) { select { case m := <-c: return m, true default: return “”, false } }

func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) { select { case m := <-c: return m, true case <-time.After(timeout): // 没有等到 return “”, false } }

func main() {

done :=make(chan struct{})
// 生成消息
m1 := msgGen("服务A",done)
m2 := msgGen("服务B",done)

for i := 0; i < 5; i++ {
    fmt.Println(<-m1)
    if m, ok := timeoutWait(m2, 1*time.Second); ok {
        fmt.Println(m)
    } else {
        fmt.Println("no message from serve")
        //time.Sleep(2*time.Second)
    }
}
done<- struct{}{}
// 送完停止信号,为了让看到结果,再等他个2s钟
<-done

}

/** service 服务A: message 0 service 服务B: message 0 service 服务A: message 1 no message from serve service 服务A: message 2 service 服务B: message 1 service 服务A: message 3 no message from serve service 服务A: message 4 service 服务B: message 2 cleaning up cleaning over

Process finished with the exit code 0

*/

```