channel 关闭时机不对
func main() {
ch := make(chan int, 1000)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
go func() {
for {
a, ok := <-ch
if !ok {
fmt.Println("close")
return
}
fmt.Println("a: ", a)
}
}()
close(ch)
fmt.Println("ok")
time.Sleep(time.Second * 100)
}
在 golang 中 goroutine
的调度时间是不确定的,在题目中,第一个写 channel
的 goroutine
可能还未调用,或已调用但没有写完时直接 close
管道,可能导致写失败,既然出现 panic
错误。
channel 关闭时机不对2
func main() {
abc := make(chan int, 1000)
for i := 0; i < 10; i++ {
abc <- i
}
go func() {
for {
a := <-abc
fmt.Println("a: ", a)
}
}()
close(abc)
fmt.Println("close")
time.Sleep(time.Second * 100)
}
协程可能还未启动,管道就关闭了。
runtime.GOMAXPROCS(1)
func main() {
runtime.GOMAXPROCS(1)
wg := sync.WaitGroup{}
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
fmt.Println("i: ", i)
wg.Done()
}()
}
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println("i: ", i)
wg.Done()
}(i)
}
wg.Wait()
}
最后输出结果是第一个循环结果不固定,主要看协程的调度时间,如果调度时间很慢,可能是全部是10
。第二个循环会依次输出0-9
。
select case 随机执行
func main() {
runtime.GOMAXPROCS(1)
int_chan := make(chan int, 1)
string_chan := make(chan string, 1)
int_chan <- 1
string_chan <- "hello"
select {
case value := <-int_chan:
fmt.Println(value)
case value := <-string_chan:
panic(value)
}
}
结果是随机执行。golang 在多个case
可读的时候会公平的选中一个执行。
defer在定义的时候输出
func calc(index string, a, b int) int {
ret := a + b
fmt.Println(index, a, b, ret)
return ret
}
func main() {
a := 1
b := 2
defer calc("1", a, calc("10", a, b))
a = 0
defer calc("2", a, calc("20", a, b))
b = 1
}
10 1 2 3
20 0 2 2
2 0 2 2
1 1 3 4
defer
在定义的时候会计算好调用函数的参数,所以会优先输出10
、20
两个参数。然后根据定义的顺序倒序执行。
map不安全
type UserAges struct {
ages map[string]int
sync.Mutex
}
func (ua *UserAges) Add(name string, age int) {
ua.Lock()
defer ua.Unlock()
ua.ages[name] = age
}
func (ua *UserAges) Get(name string) int {
if age, ok := ua.ages[name]; ok {
return age
}
return -1
}
在执行 Get方法时可能被panic。
虽然有使用sync.Mutex做写锁,但是map是并发读写不安全的。map属于引用类型,并发读写时多个协程见是通过指针访问同一个地址,即访问共享变量,此时同时读写资源存在竞争关系。会报错误信息:“fatal error: concurrent map read and map write”。
因此,在 Get
中也需要加锁,因为这里只是读,建议使用读写锁 sync.RWMutex
。
无缓冲channel阻塞
func (set *threadSafeSet) Iter() <-chan interface{} {
ch := make(chan interface{})
go func() {
set.RLock()
for elem := range set.s {
ch <- elem
}
close(ch)
set.RUnlock()
}()
return ch
}
默认情况下 make
初始化的 channel
是无缓冲的,也就是在迭代写时会阻塞。
在 golang 协程和channel配合使用
写代码实现两个 goroutine,其中一个产生随机数并写入到 go channel 中,另外一个从 channel 中读取数字并打印到标准输出。最终输出五个随机数。
这是一道很简单的golang基础题目,实现方法也有很多种,一般想答让面试官满意的答案还是有几点注意的地方。
goroutine
在golang中式非阻塞的channel
无缓冲情况下,读写都是阻塞的,且可以用for
循环来读取数据,当管道关闭后,for
退出。- golang 中有专用的
select case
语法从管道读取数据。
func main() {
out := make(chan int)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
out <- rand.Intn(5)
}
close(out)
}()
go func() {
defer wg.Done()
for i := range out {
fmt.Println(i)
}
}()
wg.Wait()
}
实现阻塞读且并发安全的map
GO里面MAP如何实现key不存在 get操作等待 直到key存在或者超时,保证并发安全,且需要实现以下接口:
type sp interface {
Out(key string, val interface{}) //存入key /val,如果该key读取的goroutine挂起,则唤醒。此方法不会阻塞,时刻都可以立即执行并返回
Rd(key string, timeout time.Duration) interface{} //读取一个key,如果key不存在阻塞,等待key存在或者超时
}
看到阻塞协程第一个想到的就是channel
,题目中要求并发安全,那么必须用锁,还要实现多个goroutine
读的时候如果值不存在则阻塞,直到写入值,那么每个键值需要有一个阻塞goroutine
的 channel
。
type Map struct {
c map[string]*entry
rmx *sync.RWMutex
}
type entry struct {
ch chan struct{}
value interface{}
isExist bool
}
func (m *Map) Out(key string, val interface{}) {
m.rmx.Lock()
defer m.rmx.Unlock()
if e, ok := m.c[key]; ok {
e.value = val
e.isExist = true
close(e.ch)
} else {
e = &entry{ch: make(chan struct{}), isExist: true,value:val}
m.c[key] = e
close(e.ch)
}
}
func (m *Map) Rd(key string, timeout time.Duration) interface{} {
m.rmx.Lock()
if e, ok := m.c[key]; ok && e.isExist {
m.rmx.Unlock()
return e.value
} else if !ok {
e = &entry{ch: make(chan struct{}), isExist: false}
m.c[key] = e
m.rmx.Unlock()
fmt.Println("协程阻塞 -> ", key)
select {
case <-e.ch:
return e.value
case <-time.After(timeout):
fmt.Println("协程超时 -> ", key)
return nil
}
} else {
m.rmx.Unlock()
fmt.Println("协程阻塞 -> ", key)
select {
case <-e.ch:
return e.value
case <-time.After(timeout):
fmt.Println("协程超时 -> ", key)
return nil
}
}
}
高并发下的锁与map的读写
场景:在一个高并发的web服务器中,要限制IP的频繁访问。现模拟100个IP同时并发访问服务器,每个IP要重复访问1000次。
每个IP三分钟之内只能访问一次。修改以下代码完成该过程,要求能成功输出 success:100
package main
import (
"fmt"
"time"
)
type Ban struct {
visitIPs map[string]time.Time
}
func NewBan() *Ban {
return &Ban{visitIPs: make(map[string]time.Time)}
}
func (o *Ban) visit(ip string) bool {
if _, ok := o.visitIPs[ip]; ok {
return true
}
o.visitIPs[ip] = time.Now()
return false
}
func main() {
success := 0
ban := NewBan()
for i := 0; i < 1000; i++ {
for j := 0; j < 100; j++ {
go func() {
ip := fmt.Sprintf("192.168.1.%d", j)
if !ban.visit(ip) {
success++
}
}()
}
}
fmt.Println("success:", success)
}
该问题主要考察了并发情况下map的读写问题,而给出的初始代码,又存在for
循环中启动goroutine
时变量使用问题以及goroutine
执行滞后问题。
因此,首先要保证启动的goroutine
得到的参数是正确的,然后保证map
的并发读写,最后保证三分钟只能访问一次。
多CPU核心下修改int
的值极端情况下会存在不同步情况,因此需要原子性的修改int值。
下面给出的实例代码,是启动了一个协程每分钟检查一下map
中的过期ip
,for
启动协程时传参。
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
type Ban struct {
visitIPs map[string]time.Time
lock sync.Mutex
}
func NewBan(ctx context.Context) *Ban {
o := &Ban{visitIPs: make(map[string]time.Time)}
go func() {
timer := time.NewTimer(time.Minute * 1)
for {
select {
case <-timer.C:
o.lock.Lock()
for k, v := range o.visitIPs {
if time.Now().Sub(v) >= time.Minute*1 {
delete(o.visitIPs, k)
}
}
o.lock.Unlock()
timer.Reset(time.Minute * 1)
case <-ctx.Done():
return
}
}
}()
return o
}
func (o *Ban) visit(ip string) bool {
o.lock.Lock()
defer o.lock.Unlock()
if _, ok := o.visitIPs[ip]; ok {
return true
}
o.visitIPs[ip] = time.Now()
return false
}
func main() {
success := int64(0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ban := NewBan(ctx)
wait := &sync.WaitGroup{}
wait.Add(1000 * 100)
for i := 0; i < 1000; i++ {
for j := 0; j < 100; j++ {
go func(j int) {
defer wait.Done()
ip := fmt.Sprintf("192.168.1.%d", j)
if !ban.visit(ip) {
atomic.AddInt64(&success, 1)
}
}(j)
}
}
wait.Wait()
fmt.Println("success:", success)
}
写出以下逻辑,要求每秒钟调用一次proc并保证程序不退出?
package main
func main() {
go func() {
// 1 在这里需要你写算法
// 2 要求每秒钟调用一次proc函数
// 3 要求程序不能退出
}()
select {}
}
func proc() {
panic("ok")
}
题目主要考察了两个知识点:
- 定时执行执行任务
- 捕获 panic 错误
题目中要求每秒钟执行一次,首先想到的就是 time.Ticker
对象,该函数可每秒钟往chan
中放一个Time
,正好符合我们的要求。
在 golang
中捕获 panic
一般会用到 recover()
函数。
package main
import (
"fmt"
"time"
)
func main() {
go func() {
// 1 在这里需要你写算法
// 2 要求每秒钟调用一次proc函数
// 3 要求程序不能退出
t := time.NewTicker(time.Second * 1)
for {
select {
case <-t.C:
go func() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
proc()
}()
}
}
}()
select {}
}
func proc() {
panic("ok")
}
为 sync.WaitGroup 中Wait函数支持 WaitTimeout 功能.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
c := make(chan struct{})
for i := 0; i < 10; i++ {
wg.Add(1)
go func(num int, close <-chan struct{}) {
defer wg.Done()
<-close
fmt.Println(num)
}(i, c)
}
if WaitTimeout(&wg, time.Second*5) {
close(c)
fmt.Println("timeout exit")
}
time.Sleep(time.Second * 10)
}
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
// 要求手写代码
// 要求sync.WaitGroup支持timeout功能
// 如果timeout到了超时时间返回true
// 如果WaitGroup自然结束返回false
}
首先 sync.WaitGroup
对象的 Wait
函数本身是阻塞的,同时,超时用到的time.Timer
对象也需要阻塞的读。
同时阻塞的两个对象肯定要每个启动一个协程,每个协程去处理一个阻塞,难点在于怎么知道哪个阻塞先完成。
目前我用的方式是声明一个没有缓冲的chan
,谁先完成谁优先向管道中写入数据。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
c := make(chan struct{})
for i := 0; i < 10; i++ {
wg.Add(1)
go func(num int, close <-chan struct{}) {
defer wg.Done()
<-close
fmt.Println(num)
}(i, c)
}
if WaitTimeout(&wg, time.Second*5) {
close(c)
fmt.Println("timeout exit")
}
time.Sleep(time.Second * 10)
}
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
// 要求手写代码
// 要求sync.WaitGroup支持timeout功能
// 如果timeout到了超时时间返回true
// 如果WaitGroup自然结束返回false
ch := make(chan bool)
go time.AfterFunc(timeout, func() {
ch <- true
})
go func() {
wg.Wait()
ch <- false
}()
return <- ch
}
这里会有多少个goroutine泄露
package main
import (
"fmt"
"io/ioutil"
"net/http"
"runtime"
)
func main() {
num := 6
for index := 0; index < num; index++ {
resp, _ := http.Get("https://www.baidu.com")
_, _ = ioutil.ReadAll(resp.Body)
}
fmt.Printf("此时goroutine个数= %d\n", runtime.NumGoroutine())
}
每次泄漏一个读和写goroutine,就是12个goroutine,加上main函数
本身也是一个goroutine
,一共有13个goroutine,
- 所以结论呼之欲出了,虽然执行了
6
次循环,而且每次都没有执行Body.Close()
,就是因为执行了ioutil.ReadAll()
把内容都读出来了,连接得以复用,因此只泄漏了一个读goroutine
和一个写goroutine
,最后加上main goroutine
,所以答案就是3个goroutine
。 - 这是用同一个域名的情况下
https://github.com/lifei6671/interview-go/blob/master/question/q015.md