为什么需要mutex
相比于 Go 语言宣扬的“用通讯的方式共享数据”,通过共享数据的方式来传递信息和协调线程运行的做法其实更加主流,毕竟大多数的现代编程语言,都是用后一种方式作为并发编程的解决方案的(这种方案的历史非常悠久,恐怕可以追溯到上个世纪多进程编程时代伊始了)
同步的用途有两个,一个是避免多个线程在同一时刻操作同一个数据块,另一个是协调多个线程,以避免它们在同一时刻执行同一个代码块。
使用互斥锁的注意事项
- 不要重复锁定互斥锁
- 不要忘记解锁互斥锁,必要时使用defer语句
- 不要对尚未锁定或者已解锁的互斥锁解锁
- 不要在多个函数之间直接传递互斥锁
读写锁
- 对于某个受到读写锁保护的共享资源,多个写操作不能同时进行
- 写操作和读操作也不能同时进行,但多个读操作却可以同时进行
- 对写锁进行解锁,会唤醒“所有因试图锁定读锁,而被阻塞的 goroutine”
- 对读锁进行解锁,只会在没有其他读锁锁定的前提下,唤醒“因试图锁定写锁,而被阻塞的 goroutine”;并且,最终只会有一个被唤醒的 goroutine 能够成功完成对写锁的锁定,其他的 goroutine 还要在原处继续等待。至于是哪一个 goroutine,那就要看谁的等待时间最长
package mainimport ("bytes""flag""fmt""io""io/ioutil""log""sync")var protecting uintfunc init() {flag.UintVar(&protecting, "protecting", 0, "It indicates whether to use a mutex to protect data writing.")}func main() {flag.Parse()var buffer bytes.Bufferconst (max1 = 5 // 代表启动goroutine的数量max2 = 10 // 代表每个goruntine需要写入的数据块的数量max3 = 10 // 代表每个数据块有多少重复的数字)var mu sync.Mutexsign := make(chan struct{}, max1)for i := 1; i <= max1; i++ {go func(id int, writer io.Writer) {defer func() {sign <- struct{}{}}()for j := 1; j <= max2; j ++ {header := fmt.Sprintf("\n[id: %d, iteration: %d]", id , j)data := fmt.Sprintf(" %d", id * j)if protecting > 0 {mu.Lock()}_, err := writer.Write([]byte(header))if err != nil {log.Printf("error: %s [%d]", err, id)}for k := 0; k < max3; k++ {_, err := writer.Write([]byte(data))if err != nil {log.Printf("error: %s [%d]", err, id)}}if protecting > 0 {mu.Unlock()}}}(i, &buffer)}for i := 0; i < max1; i++ {<- sign}data, err := ioutil.ReadAll(&buffer)if err != nil {log.Fatalf("fatal error: %s", err)}log.Printf("The contents:\n%s",data)}
package mainimport ("bytes""errors""fmt""io""log""sync""time")// singleHandler 代表处理函数的类型type singleHandler func() (data string, n int, err error)// handlerConfig 代表处理流程配置的类型type handlerConfig struct {handler singleHandler // 单次处理函数goNum int // 需要启用的goroutine的数量number int // 单个goroutine中处理次数interval time.Duration // 单个goroutine中处理的时间间隔counter int // 数据量计数器,以字节为单位counterMu sync.Mutex // 数据量计数器专用互斥锁}// count 会增加计数器的值,并会返回增加后的计数func (hc *handlerConfig) count(increment int) int {hc.counterMu.Lock()defer hc.counterMu.Unlock()hc.counter += incrementreturn hc.counter}func main() {// mu 代表以下流程要使用的互斥锁var mu sync.Mutex// genWriter 代表的是用于生成写入函数的函数genWriter := func(writer io.Writer) singleHandler {return func() (data string, n int, err error) {// 准备数据data = fmt.Sprintf("%s\t", time.Now().Format(time.StampNano))// 写入数据mu.Lock()defer mu.Unlock()n, err = writer.Write([]byte(data))return}}// genReader 代表的是用于生成读取函数的函数genReader := func(reader io.Reader) singleHandler{return func() (data string, n int, err error) {buffer, ok := reader.(*bytes.Buffer)if !ok {err = errors.New("unsupported reader")return}// 读取数据mu.Lock()defer mu.Unlock()data, err = buffer.ReadString('\t')n = len(data)return}}// buffer 代表缓冲区var buffer bytes.Buffer// 数据写入配置writingConfig := handlerConfig {handler: genWriter(&buffer),goNum: 5,number: 4,interval: time.Millisecond * 100,}// sign 代表信号的通道readingConfig := handlerConfig{handler: genReader(&buffer),goNum: 10,number: 2,interval: time.Millisecond * 100,}// sign 代表信号的通道sign := make(chan struct{}, writingConfig.goNum + readingConfig.goNum)// 启动多个goroutine对换重组进行多次数据写入for i := 1; i <= writingConfig.goNum; i++ {go func(i int) {defer func() {sign <- struct{}{}}()for j := 1; j <= writingConfig.number; j ++ {time.Sleep(writingConfig.interval)data, n, err := writingConfig.handler()if err != nil {log.Printf("writer [%d-%d]: error: %s", i, j, err)continue}total := writingConfig.count(n)log.Printf("writer [%d-%d]: %s (total: %d)", i, j, data, total)}}(i)}// 启用多个goroutine对缓冲区进行多次数据读取for i := 1; i <= readingConfig.goNum; i++ {go func(i int){defer func() {sign <- struct{}{}}()for j := 1; j<= readingConfig.number; j ++ {time.Sleep(readingConfig.interval)var data stringvar n intvar err errorfor {data, n, err = readingConfig.handler()if err == nil || err != io.EOF {break}// 如果读比写快(读时会发生EOF错误),那就等一会再读time.Sleep(readingConfig.interval)}if err != nil {log.Printf("read [%d-%d]: error: %s", i, j, err)continue}total := readingConfig.count(n)log.Printf("reader [%d-%d]: %s (totl: %d)", i, j, data, total)}}(i)}// signNumber 代表需要接受信号的数量signNumber := writingConfig.goNum + readingConfig.goNum// 等待上面启用的所有goroutine的运行全部结束for j := 0; j < signNumber; j ++ {<-sign}}
