网址:https://gobea.cn/

1、go Http Post 发送文件流

  1. package main
  2. import (
  3. "net/http"
  4. "net/url"
  5. "fmt"
  6. "io/ioutil"
  7. _ "io"
  8. "bytes"
  9. )
  10. func main() {
  11. postFile()
  12. }
  13. func post() {
  14. //这是一个Post 参数会被返回的地址
  15. strinUrl:="http://localhost:8080/aaa"`这里写代码片`
  16. resopne,err:= http.PostForm(strinUrl,url.Values{"num":{"456"},"num1":{"123"}})
  17. if err !=nil {
  18. fmt.Println("err=",err)
  19. }
  20. defer func() {
  21. resopne.Body.Close()
  22. fmt.Println("finish")
  23. }()
  24. body,err:=ioutil.ReadAll(resopne.Body)
  25. if err!=nil {
  26. fmt.Println(" post err=",err)
  27. }
  28. fmt.Println(string(body))
  29. }
  30. func postFile(){
  31. //这是一个Post 参数会被返回的地址
  32. strinUrl:="http://localhost:8080/aaa"
  33. byte,err:=ioutil.ReadFile("post.txt")
  34. resopne,err :=http.Post(strinUrl,"multipart/form-data",bytes.NewReader(byte))
  35. if err !=nil {
  36. fmt.Println("err=",err)
  37. }
  38. defer func() {
  39. resopne.Body.Close()
  40. fmt.Println("finish")
  41. }()
  42. body,err:=ioutil.ReadAll(resopne.Body)
  43. if err!=nil {
  44. fmt.Println(" post err=",err)
  45. }
  46. fmt.Println(string(body))
  47. }

二、sync.Map

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type Class struct {
  7. Students sync.Map
  8. }
  9. func handler(key, value interface{}) bool {
  10. fmt.Printf("Name :%s %s\n", key, value)
  11. return true
  12. }
  13. func main() {
  14. class := &Class{}
  15. //存储值
  16. class.Students.Store("Zhao", "class 1")
  17. class.Students.Store("Qian", "class 2")
  18. class.Students.Store("Sun", "class 3")
  19. //遍历,传入一个函数,遍历的时候函数返回false则停止遍历
  20. class.Students.Range(handler)
  21. //查询
  22. if _, ok := class.Students.Load("Li"); !ok {
  23. fmt.Println("-->Li not found")
  24. }
  25. //查询或者追加
  26. _, loaded := class.Students.LoadOrStore("Li", "class 4")
  27. if loaded {
  28. fmt.Println("-->Load Li success")
  29. } else {
  30. fmt.Println("-->Store Li success")
  31. }
  32. //删除
  33. class.Students.Delete("Sun")
  34. fmt.Println("-->Delete Sun success")
  35. //遍历
  36. class.Students.Range(handler)
  37. }

三、路由器

  1. package main
  2. import (
  3. "fmt"
  4. "net/http"
  5. )
  6. type MyMux struct {
  7. }
  8. func (p *MyMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  9. if r.URL.Path == "/" {
  10. sayhelloName(w, r)
  11. return
  12. }
  13. http.NotFound(w, r)
  14. return
  15. }
  16. func sayhelloName(w http.ResponseWriter, r *http.Request) {
  17. fmt.Fprintf(w, "Hello myroute!")
  18. }
  19. func main() {
  20. mux := &MyMux{}
  21. http.ListenAndServe(":9090", mux)
  22. }

四、解析yaml文件

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "gopkg.in/yaml.v2"
  6. )
  7. var data = `
  8. blog: xiaorui.cc
  9. best_authors: ["fengyun","lee","park"]
  10. desc:
  11. counter: 521
  12. plist: [3, 4]
  13. `
  14. type T struct {
  15. Blog string
  16. Authors []string `yaml:"best_authors,flow"`
  17. Desc struct {
  18. Counter int `yaml:"Counter"`
  19. Plist []int `yaml:",flow"`
  20. }
  21. }
  22. func main() {
  23. t := T{}
  24. //把yaml形式的字符串解析成struct类型
  25. err := yaml.Unmarshal([]byte(data), &t)
  26. //修改struct里面的记录
  27. t.Blog = "this is Blog"
  28. t.Authors = append(t.Authors, "myself")
  29. t.Desc.Counter = 99
  30. fmt.Printf("--- t:\n%v\n\n", t)
  31. //转换成yaml字符串类型
  32. d, err := yaml.Marshal(&t)
  33. if err != nil {
  34. log.Fatalf("error: %v", err)
  35. }
  36. fmt.Printf("--- t dump:\n%s\n\n", string(d))
  37. }

五、mq

  1. package main
  2. import (
  3. "flag"
  4. "log"
  5. "strconv"
  6. "time"
  7. "github.com/nats-io/nats.go"
  8. "github.com/pborman/uuid"
  9. )
  10. const (
  11. //url = "nats://192.168.3.125:4222"
  12. url = nats.DefaultURL
  13. )
  14. var (
  15. nc *nats.Conn
  16. err error
  17. )
  18. func init() {
  19. // if nc, err = nats.Connect(url); checkErr(err) {
  20. // fmt.
  21. // }
  22. }
  23. func main() {
  24. }
  25. //发送消息
  26. func sendMessage() {
  27. var (
  28. subj = flag.String("subj", "", "subject name")
  29. )
  30. flag.Parse()
  31. log.Println(*subj)
  32. startClient(*subj)
  33. time.Sleep(time.Second)
  34. }
  35. func startClient(subj string) {
  36. for i := 0; i < 1; i++ {
  37. id := uuid.New()
  38. log.Println(id)
  39. nc.Publish(subj, []byte(id+" Sun "+strconv.Itoa(i)))
  40. nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i)))
  41. nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i)))
  42. nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i)))
  43. }
  44. }
  45. //接收消息
  46. func receiveMessage() {
  47. var (
  48. servername = flag.String("servername", "y", "name for server")
  49. queueGroup = flag.String("group", "", "group name for Subscribe")
  50. subj = flag.String("subj", "", "subject name")
  51. )
  52. flag.Parse()
  53. log.Println(*servername, *queueGroup, *subj)
  54. startService(*subj, *servername+" worker1", *queueGroup)
  55. startService(*subj, *servername+" worker2", *queueGroup)
  56. startService(*subj, *servername+" worker3", *queueGroup)
  57. select {}
  58. }
  59. func startService(subj, name, queue string) {
  60. go async(nc, subj, name, queue)
  61. }
  62. func async(nc *nats.Conn, subj, name, queue string) {
  63. nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
  64. log.Println(name, "Received a message From Async : ", string(msg.Data))
  65. })
  66. }
  67. func checkErr(err error) bool {
  68. if err != nil {
  69. log.Println(err)
  70. return false
  71. }
  72. return true
  73. }
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/streadway/amqp"
  6. )
  7. func failOnError(err error, msg string) {
  8. if err != nil {
  9. log.Fatalf("%s: %s", msg, err)
  10. panic(fmt.Sprintf("%s: %s", msg, err))
  11. }
  12. }
  13. func main() {
  14. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  15. failOnError(err, "Failed to connect to RabbitMQ")
  16. defer conn.Close()
  17. ch, err := conn.Channel()
  18. failOnError(err, "Failed to open a channel")
  19. defer ch.Close()
  20. failOnError(err, "Failed to declare a queue")
  21. err = ch.Qos(
  22. 1, // prefetch count
  23. 0, // prefetch size
  24. false, //global
  25. )
  26. failOnError(err, "Failed to set Qos")
  27. //发送消息
  28. funcSend(ch)
  29. //收消息
  30. funcRecv(ch)
  31. }
  32. //生产者
  33. func funcSend(ch *amqp.Channel) {
  34. q, err := ch.QueueDeclare(
  35. "queue", //name
  36. false, //durables
  37. false, //delete when unused
  38. false, //exclusive
  39. false, //no wait
  40. nil, //args
  41. )
  42. failOnError(err, "Failed to declare a queue")
  43. body := "Hello World!"
  44. error := ch.Publish(
  45. "", // exchange
  46. q.Name, // routing key
  47. false, // mandatory
  48. false, // immediate
  49. amqp.Publishing{
  50. ContentType: "text/plain",
  51. Body: []byte(body),
  52. })
  53. log.Printf("[x] Sent %s", body)
  54. failOnError(error, "Failed to publish a message")
  55. }
  56. //消费者
  57. func funcRecv(ch *amqp.Channel) {
  58. q, err := ch.QueueDeclare(
  59. "queue", // name
  60. false, // durable
  61. false, // delete when unused
  62. false, // exclusive
  63. false, // no-wait
  64. nil, // arguments
  65. )
  66. failOnError(err, "Failed to declare a queue")
  67. msgs, err := ch.Consume(
  68. q.Name, // queue
  69. "", // consumer
  70. true, // auto-ack
  71. false, // exclusive
  72. false, // no-local
  73. false, // no-wait
  74. nil, // args
  75. )
  76. failOnError(err, "Failed to register a consumer")
  77. forever := make(chan bool)
  78. go func() {
  79. for d := range msgs {
  80. log.Printf("Received a message: %s", d.Body)
  81. }
  82. }()
  83. log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  84. <-forever
  85. }

六、map

  1. package main
  2. import "fmt"
  3. import "sync"
  4. import "time"
  5. func main(){
  6. var mTest sync.Map
  7. go SyncTest(mTest)
  8. go SyncTest(mTest)
  9. time.Sleep(time.Second * 20)
  10. }
  11. func SyncTest(mTest sync.Map){
  12. for j := 0; j < 1000; j++{
  13. fmt.Println(mTest.Load("A"))//Load 读取
  14. mTest.Store("A", "a") //Store 存储
  15. fmt.Println(mTest.Load("A"))
  16. mTest.Delete("A") //Delete 删除
  17. fmt.Println(mTest.Load("A"))
  18. mTest.LoadOrStore("B","b") //LoadOrStore 读取失败则存储
  19. fmt.Println(mTest.Load("B"))
  20. }
  21. }
  22. func funcMap() {
  23. // 先声明map
  24. var m1 map[string]string
  25. // 再使用make函数创建一个非nil的map,nil map不能赋值
  26. m1 = make(map[string]string)
  27. // 最后给已声明的map赋值
  28. m1["a"] = "aa"
  29. m1["b"] = "bb"
  30. // 直接创建
  31. m2 := make(map[string]string)
  32. // 然后赋值
  33. m2["a"] = "aa"
  34. m2["b"] = "bb"
  35. // 初始化 + 赋值一体化
  36. m3 := map[string]string{
  37. "a": "aa",
  38. "b": "bb",
  39. }
  40. fmt.Println(m3)
  41. // ==========================================
  42. // 查找键值是否存在
  43. if v, ok := m1["a"]; ok {
  44. fmt.Println(v)
  45. } else {
  46. fmt.Println("Key Not Found")
  47. }
  48. // 遍历map
  49. for k, v := range m1 {
  50. fmt.Println(k, v)
  51. }
  52. }

七、多路复用

  1. package main
  2. import (
  3. "fmt"
  4. "net/http"
  5. )
  6. // 设置多个处理器函数
  7. func handler1(w http.ResponseWriter, r *http.Request) {
  8. fmt.Fprintf(w, "1 欢迎访问 www.ydook.com !")
  9. }
  10. func handler2(w http.ResponseWriter, r *http.Request) {
  11. fmt.Fprintf(w, "2 欢迎访问 www.ydook.com !")
  12. }
  13. func handler3(w http.ResponseWriter, r *http.Request) {
  14. fmt.Fprintf(w, "3 欢迎访问 www.ydook.com !")
  15. }
  16. func main() {
  17. // 设置多路复用处理函数
  18. mux := http.NewServeMux()
  19. mux.HandleFunc("/h1", handler1)
  20. mux.HandleFunc("/h2", handler2)
  21. mux.HandleFunc("/h3", handler3)
  22. // 设置服务器
  23. server := &http.Server{
  24. Addr: "127.0.0.1:8000",
  25. Handler: mux,
  26. }
  27. // 设置服务器监听请求端口
  28. server.ListenAndServe()
  29. }

八、for循环

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. str := []string{"I","am","Sergey"}
  8. for _,v := range str{
  9. go func() {
  10. fmt.Println(v)
  11. }()
  12. }
  13. time.Sleep(3 * time.Second)
  14. }
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. str := []string{"I","am","Sergey"}
  8. for _,v := range str{
  9. go func(v string) {
  10. fmt.Println(v)
  11. }(v)
  12. }
  13. time.Sleep(3 * time.Second)
  14. }
  1. package main
  2. import "fmt"
  3. func main() {
  4. slice := []int{0, 1, 2, 3}
  5. myMap := make(map[int]*int)
  6. for index , value := range slice {
  7. myMap[index] = &value
  8. }
  9. prtMap(myMap)
  10. }
  11. func prtMap(myMap map[int]*int) {
  12. for key, value := range myMap {
  13. fmt.Printf("map[%v]=%v\n", key, *value)
  14. }
  15. }
  1. package main
  2. import "fmt"
  3. func main() {
  4. slice := []int{0, 1, 2, 3}
  5. myMap := make(map[int]*int)
  6. for index , value := range slice {
  7. v := value
  8. myMap[index] = &v
  9. }
  10. prtMap(myMap)
  11. }
  12. func prtMap(myMap map[int]*int) {
  13. for key, value := range myMap {
  14. fmt.Printf("map[%v]=%v\n", key, *value)
  15. }
  16. }

九、Stringer接口

  1. package main
  2. import "fmt"
  3. type Person struct {
  4. Name string
  5. Age int
  6. }
  7. func (p Person) String() string {
  8. return fmt.Sprintf("%v (%v years)", p.Name, p.Age)
  9. }
  10. func main() {
  11. a := Person{"Arthur Dent", 42}
  12. z := Person{"Zaphod Beeblebrox", 9001}
  13. fmt.Println(a, z)
  14. }

十、抽象类实现

  1. type IPeople interface {
  2. GetName() string
  3. SetName(string)
  4. GetAge() int
  5. SetAge(int)
  6. Run()
  7. }
  8. type AbstractPeople struct {
  9. IPeople
  10. name string
  11. age int
  12. }
  13. func (a AbstractPeople) GetName() string {
  14. return a.name
  15. }
  16. func (a *AbstractPeople) SetName(newName string) {
  17. a.name = newName
  18. }
  19. func (a AbstractPeople) GetAge() int {
  20. return a.age
  21. }
  22. func (a *AbstractPeople) SetAge(newAge int) {
  23. a.age = newAge
  24. }
  25. // user.
  26. func UsePeople(p IPeople) {
  27. fmt.Println(p.GetName())
  28. fmt.Println(p.GetAge())
  29. p.SetName("Alice")
  30. p.SetAge(p.GetAge() + 1)
  31. p.Run()
  32. }

十一、接口

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. // 定义一个数据写入器
  6. type DataWriter interface {
  7. WriteData(data interface{}) error
  8. }
  9. // 定义文件结构,用于实现DataWriter
  10. type file struct {
  11. }
  12. // 实现DataWriter接口的WriteData方法
  13. func (d *file) WriteData(data interface{}) error {
  14. // 模拟写入数据
  15. fmt.Println("WriteData:", data)
  16. return nil
  17. }
  18. func main() {
  19. // 实例化file
  20. f := new(file)
  21. // 声明一个DataWriter的接口
  22. var writer DataWriter
  23. // 将接口赋值f,也就是*file类型
  24. writer = f
  25. // 使用DataWriter接口进行数据写入
  26. writer.WriteData("data")
  27. }

十二、多线程

  1. package main
  2. import (
  3. "runtime"
  4. "sync"
  5. )
  6. var (
  7. cpunum = runtime.NumCPU()-1
  8. )
  9. func main(){
  10. ch := make(chan string)
  11. runtime.GOMAXPROCS(cpunum)
  12. wg := sync.WaitGroup{}
  13. for i := 0; i < cpunum; i++{
  14. go WgReadLogs(ch, &wg)
  15. }
  16. wg.Add(2)
  17. ch <- "./health/stat1.rec"
  18. ch <- "./report/stat2.rec"
  19. wg.Wait()
  20. close(ch)
  21. }
  22. func WgReadLogs(ch chan string,wg *sync.WaitGroup){
  23. for true{
  24. tmp,ok := <-ch
  25. if !ok{
  26. break
  27. }
  28. ReadLogs(tmp)
  29. wg.Done()
  30. }
  31. }
  32. func ReadLogs(logname string){
  33. fmt.Println(logname)
  34. }

十三、websocket

  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "unsafe"
  8. "golang.org/x/net/websocket"
  9. )
  10. func BytePtrToString(p *byte) string {
  11. bufs := bytes.NewBufferString("")
  12. for ps := p; *ps != byte(0); ps = (*byte)(unsafe.Pointer(1 + (uintptr)(unsafe.Pointer(ps)))) {
  13. bufs.WriteByte(*ps)
  14. }
  15. return bufs.String()
  16. }
  17. func main() {
  18. fmt.Println("websocket地址: ws://127.0.0.1:8080/runtime")
  19. http.Handle("/runtime", websocket.Handler(echo))
  20. if err := http.ListenAndServe(":8080", nil); err != nil {
  21. log.Fatal(err)
  22. }
  23. }
  24. //测试地址: http://coolaf.com/tool/chattest 首字母大写是公有的 Echo,首字母小写是私有的 echo
  25. func echo(w *websocket.Conn) {
  26. var error error
  27. for {
  28. //只支持string类型
  29. var reply string
  30. if error = websocket.Message.Receive(w, &reply); error != nil {
  31. log.Println("websocket出现异常", error)
  32. break
  33. }
  34. fmt.Println("收到客户端消息:" + reply)
  35. msg := reply + ", 我是服务端"
  36. fmt.Println("发送客户端消息:" + msg)
  37. if error = websocket.Message.Send(w, msg); error != nil {
  38. log.Println("websocket出现异常", error)
  39. break
  40. }
  41. }
  42. }

十四、协程池

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // 任务的属性应该是一个业务函数
  7. type Task struct {
  8. f func() error // 函数名f, 无参,返回值为error
  9. }
  10. // 创建Task任务
  11. func NewTask(arg_f func() error) *Task {
  12. task := Task{
  13. f: arg_f,
  14. }
  15. return &task
  16. }
  17. // Task绑定业务方法
  18. func (task *Task) Execute() {
  19. task.f() // 调用任务中已经绑定好的业务方法
  20. }
  21. // ------------------------------------------------
  22. type Pool struct {
  23. EntryChannel chan *Task // 对外的Task入口
  24. JobsChannel chan *Task // 内部的Task队列
  25. workerNum int // 协程池中最大的woker数量
  26. }
  27. // 创建Pool
  28. func NewPool(cap int) *Pool {
  29. pool := Pool{
  30. EntryChannel: make(chan *Task),
  31. JobsChannel: make(chan *Task),
  32. workerNum: cap,
  33. }
  34. return &pool
  35. }
  36. // Pool绑定干活的方法
  37. func (pool *Pool) worker(workID int) {
  38. // worker工作 : 永久从JobsChannel取任务 然后执行任务
  39. for task := range pool.JobsChannel {
  40. task.Execute()
  41. fmt.Println("work ID ", workID, " has executed")
  42. }
  43. }
  44. // Pool绑定协程池工作方法
  45. func (pool *Pool) run() {
  46. // 定义worker数量
  47. for i := 0; i < pool.workerNum; i++ {
  48. go pool.worker(i)
  49. }
  50. // 从EntryChannel去任务,发送给JobsChannel
  51. for task := range pool.EntryChannel {
  52. pool.JobsChannel <- task // 添加task优先级排序逻辑
  53. }
  54. }
  55. // ------------------------------------------------
  56. func main() {
  57. // 创建一些任务
  58. task := NewTask(func() error { // 匿名函数
  59. fmt.Println(time.Now())
  60. return nil
  61. })
  62. // 创建协程池
  63. pool := NewPool(4)
  64. // 创建多任务,抛给协程池
  65. go func() { // 开启新的协程,防止阻塞
  66. for {
  67. pool.EntryChannel <- task
  68. }
  69. }()
  70. // 启动协程池
  71. pool.run()
  72. }
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. // 每个协程都会运行该函数。
  8. // 注意,WaitGroup 必须通过指针传递给函数。
  9. func worker(id int, wg *sync.WaitGroup) {
  10. fmt.Printf("Worker %d starting\n", id)
  11. // 睡眠一秒钟,以此来模拟耗时的任务。
  12. time.Sleep(time.Second)
  13. fmt.Printf("Worker %d done\n", id)
  14. // 通知 WaitGroup ,当前协程的工作已经完成。
  15. wg.Done()
  16. }
  17. func main() {
  18. // 这个 WaitGroup 被用于等待该函数开启的所有协程。
  19. var wg sync.WaitGroup
  20. // 开启几个协程,并为其递增 WaitGroup 的计数器。
  21. for i := 1; i <= 5; i++ {
  22. wg.Add(1)
  23. go worker(i, &wg)
  24. }
  25. // 阻塞,直到 WaitGroup 计数器恢复为 0,即所有协程的工作都已经完成。
  26. wg.Wait()
  27. }
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // 这里是 worker,我们将并发执行多个 worker。
  7. // worker 将从 `jobs` 通道接收任务,并且通过 `results` 发送对应的结果。
  8. // 我们将让每个任务间隔 1s 来模仿一个耗时的任务。
  9. func worker(id int, jobs <-chan int, results chan<- int) {
  10. for j := range jobs {
  11. fmt.Println("worker", id, "processing job", j)
  12. time.Sleep(time.Second)
  13. results <- j * 2
  14. }
  15. }
  16. func main() {
  17. // 为了使用 worker 线程池并且收集他们的结果,我们需要 2 个通道。
  18. jobs := make(chan int, 100)
  19. results := make(chan int, 100)
  20. // 这里启动了 3 个 worker,初始是阻塞的,因为还没有传递任务。
  21. for w := 1; w <= 3; w++ {
  22. go worker(w, jobs, results)
  23. }
  24. // 这里我们发送 9 个 `jobs`,然后 `close` 这些通道
  25. // 来表示这些就是所有的任务了。
  26. for j := 1; j <= 9; j++ {
  27. jobs <- j
  28. }
  29. close(jobs)
  30. // 最后,我们收集所有这些任务的返回值。
  31. for a := 1; a <= 9; a++ {
  32. <-results
  33. }
  34. }
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "sync/atomic"
  7. pool "github.com/jolestar/go-commons-pool"
  8. )
  9. func Example_simple() {
  10. type myPoolObject struct {
  11. s string
  12. }
  13. v := uint64(0)
  14. factory := pool.NewPooledObjectFactorySimple(
  15. func(context.Context) (interface{}, error) {
  16. return &myPoolObject{
  17. s: strconv.FormatUint(atomic.AddUint64(&v, 1), 10),
  18. },
  19. nil
  20. })
  21. ctx := context.Background()
  22. p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)
  23. obj, err := p.BorrowObject(ctx)
  24. if err != nil {
  25. panic(err)
  26. }
  27. o := obj.(*myPoolObject)
  28. fmt.Println(o.s)
  29. err = p.ReturnObject(ctx, obj)
  30. if err != nil {
  31. panic(err)
  32. }
  33. // Output: 1
  34. }
  1. package main
  2. import (
  3. "io/ioutil"
  4. "net/http"
  5. "runtime"
  6. "github.com/Jeffail/tunny"
  7. )
  8. func main() {
  9. numCPUs := runtime.NumCPU()
  10. pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
  11. var result []byte
  12. // TODO: Something CPU heavy with payload
  13. return result
  14. })
  15. defer pool.Close()
  16. http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
  17. input, err := ioutil.ReadAll(r.Body)
  18. if err != nil {
  19. http.Error(w, "Internal error", http.StatusInternalServerError)
  20. }
  21. defer r.Body.Close()
  22. // Funnel this work into our pool. This call is synchronous and will
  23. // block until the job is completed.
  24. result := pool.Process(input)
  25. w.Write(result.([]byte))
  26. })
  27. http.ListenAndServe(":8080", nil)
  28. }
  1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. var (
  11. // ErrInvalidPoolCap return if pool size <= 0
  12. ErrInvalidPoolCap = errors.New("invalid pool cap")
  13. // ErrPoolAlreadyClosed put task but pool already closed
  14. ErrPoolAlreadyClosed = errors.New("pool already closed")
  15. )
  16. const (
  17. // RUNNING pool is running
  18. RUNNING = 1
  19. // STOPED pool is stoped
  20. STOPED = 0
  21. )
  22. // Task task to-do
  23. type Task struct {
  24. Handler func(v ...interface{})
  25. Params []interface{}
  26. }
  27. // Pool task pool
  28. type Pool struct {
  29. capacity uint64
  30. runningWorkers uint64
  31. state int64
  32. taskC chan *Task
  33. PanicHandler func(interface{})
  34. sync.Mutex
  35. }
  36. // NewPool init pool
  37. func NewPool(capacity uint64) (*Pool, error) {
  38. if capacity <= 0 {
  39. return nil, ErrInvalidPoolCap
  40. }
  41. return &Pool{
  42. capacity: capacity,
  43. state: RUNNING,
  44. taskC: make(chan *Task, capacity),
  45. }, nil
  46. }
  47. // GetCap get capacity
  48. func (p *Pool) GetCap() uint64 {
  49. return p.capacity
  50. }
  51. // GetRunningWorkers get running workers
  52. func (p *Pool) GetRunningWorkers() uint64 {
  53. return atomic.LoadUint64(&p.runningWorkers)
  54. }
  55. func (p *Pool) incRunning() {
  56. atomic.AddUint64(&p.runningWorkers, 1)
  57. }
  58. func (p *Pool) decRunning() {
  59. atomic.AddUint64(&p.runningWorkers, ^uint64(0))
  60. }
  61. // Put put a task to pool
  62. func (p *Pool) Put(task *Task) error {
  63. if p.getState() == STOPED {
  64. return ErrPoolAlreadyClosed
  65. }
  66. // safe run worker
  67. p.Lock()
  68. if p.GetRunningWorkers() < p.GetCap() {
  69. p.run()
  70. }
  71. p.Unlock()
  72. // send task safe
  73. p.Lock()
  74. if p.state == RUNNING {
  75. p.taskC <- task
  76. }
  77. p.Unlock()
  78. return nil
  79. }
  80. func (p *Pool) run() {
  81. p.incRunning()
  82. go func() {
  83. defer func() {
  84. p.decRunning()
  85. if r := recover(); r != nil {
  86. if p.PanicHandler != nil {
  87. p.PanicHandler(r)
  88. } else {
  89. log.Printf("Worker panic: %s\n", r)
  90. }
  91. }
  92. }()
  93. for {
  94. select {
  95. case task, ok := <-p.taskC:
  96. if !ok {
  97. return
  98. }
  99. task.Handler(task.Params...)
  100. }
  101. }
  102. }()
  103. }
  104. func (p *Pool) getState() int64 {
  105. p.Lock()
  106. defer p.Unlock()
  107. return p.state
  108. }
  109. func (p *Pool) setState(state int64) {
  110. p.Lock()
  111. defer p.Unlock()
  112. p.state = state
  113. }
  114. // close safe
  115. func (p *Pool) close() {
  116. p.Lock()
  117. defer p.Unlock()
  118. close(p.taskC)
  119. }
  120. // Close close pool graceful
  121. func (p *Pool) Close() {
  122. if p.getState() == STOPED {
  123. return
  124. }
  125. p.setState(STOPED) // stop put task
  126. for len(p.taskC) > 0 { // wait all task be consumed
  127. time.Sleep(1e6) // reduce CPU load
  128. }
  129. p.close()
  130. }
  131. //https://github.com/wazsmwazsm/mortar
  132. func main() {
  133. // 创建容量为 10 的任务池
  134. pool, err := NewPool(10)
  135. if err != nil {
  136. panic(err)
  137. }
  138. wg := new(sync.WaitGroup)
  139. for i := 0; i < 1000; i++ {
  140. wg.Add(1)
  141. // 创建任务
  142. task := &Task{
  143. Handler: func(v ...interface{}) {
  144. wg.Done()
  145. fmt.Println(v)
  146. },
  147. }
  148. // 添加任务函数的参数
  149. task.Params = []interface{}{i, i * 2, "hello"}
  150. // 将任务放入任务池
  151. pool.Put(task)
  152. }
  153. wg.Add(1)
  154. // 再创建一个任务
  155. pool.Put(&Task{
  156. Handler: func(v ...interface{}) {
  157. wg.Done()
  158. fmt.Println(v)
  159. },
  160. Params: []interface{}{"hi!"}, // 也可以在创建任务时设置参数
  161. })
  162. wg.Wait()
  163. // 安全关闭任务池(保证已加入池中的任务被消费完)
  164. pool.Close()
  165. // 如果任务池已经关闭, Put() 方法会返回 ErrPoolAlreadyClosed 错误
  166. err = pool.Put(&Task{
  167. Handler: func(v ...interface{}) {},
  168. })
  169. if err != nil {
  170. fmt.Println(err) // print: pool already closed
  171. }
  172. }

十五、go调用C语言

注意:
rpc(远程函数调用)提供client端通过网络调用远程server端的函数的服务。
rpc-server端需要提供较高的吞吐能力,支持较大的并发连接。

  • epoll监听多个连接fd,实现IO复用

    1. 1epoll没有最大并发连接的限制,上限是最大可以打开文件的数目,一般远大于2048<br /> 2epoll效率高,只管活跃的连接,而跟连接总数无关<br /> 3)使用共享内存,省去了内存拷贝
  • 生产者&消费者模式对请求任务进行管理

    1. 一个IO-thread线程维护连接池中的连接,连接有3种状态:NO_USEDREADYBUSY。<br /> 1NO_USED状态,fd位置空闲,连接位尚未使用<br /> 2READY状态,已经与client端连接连接的fd,等待请求事件<br /> 3BUSY状态,已经收到请求事件,将fd放入任务队列,等待work-thread处理

    ```go package main

/*

include

include

void say(){ printf(“hello world\n”); } */ import “C”

import ( “fmt” “net/http” “runtime” “syscall” “time”

  1. "github.com/gin-gonic/gin"

)

func main() { runtime.LockOSThread() router := gin.Default() router.GET(“/ping”, func(c gin.Context) { fmt.Println(“worker”, syscall.Getegid()) c.JSON(200, gin.H{ “message”: “pong”, }) }) s := &http.Server{ Addr: “:18282”, Handler: router, ReadTimeout: 600 time.Second, WriteTimeout: 600 * time.Second, MaxHeaderBytes: 1 << 20, } fmt.Println(“listen:”, syscall.Getegid()) s.ListenAndServe() fmt.Println(“main”, C.say()) }

func testMain() { runtime.GOMAXPROCS(runtime.NumCPU()) ch1 := make(chan bool) ch2 := make(chan bool) fmt.Println(“main”, C.say()) go func() { runtime.LockOSThread() fmt.Println(“locked”, C.say()) go func() { fmt.Println(“locked child”, C.say()) ch1 <- true }() ch2 <- true }() <-ch1 <-ch2 }

  1. <a name="AlQ0C"></a>
  2. # 十六、go RPC
  3. ```go
  4. package main
  5. import (
  6. "context"
  7. "log"
  8. "net"
  9. "os"
  10. "time"
  11. "google.golang.org/grpc"
  12. pb "google.golang.org/grpc/examples/helloworld/helloworld"
  13. "google.golang.org/grpc/reflection"
  14. )
  15. func main() {
  16. println(f(1))
  17. }
  18. const (
  19. port = ":50051"
  20. address = "localhost:50051"
  21. defaultName = "world"
  22. )
  23. type server struct{} //服务对象
  24. // SayHello 实现服务的接口 在proto中定义的所有服务都是接口
  25. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  26. return &pb.HelloReply{Message: "Hello " + in.Name}, nil
  27. }
  28. func serverMain() {
  29. lis, err := net.Listen("tcp", port)
  30. if err != nil {
  31. log.Fatalf("failed to listen: %v", err)
  32. }
  33. s := grpc.NewServer() //起一个服务
  34. pb.RegisterGreeterServer(s, &server{})
  35. // 注册反射服务 这个服务是CLI使用的 跟服务本身没有关系
  36. reflection.Register(s)
  37. if err := s.Serve(lis); err != nil {
  38. log.Fatalf("failed to serve: %v", err)
  39. }
  40. }
  41. func clientMain() {
  42. //建立链接
  43. conn, err := grpc.Dial(address, grpc.WithInsecure())
  44. if err != nil {
  45. log.Fatalf("did not connect: %v", err)
  46. }
  47. defer conn.Close()
  48. c := pb.NewGreeterClient(conn)
  49. // Contact the server and print out its response.
  50. name := defaultName
  51. if len(os.Args) > 1 {
  52. name = os.Args[1]
  53. }
  54. // 1秒的上下文
  55. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  56. defer cancel()
  57. r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
  58. if err != nil {
  59. log.Fatalf("could not greet: %v", err)
  60. }
  61. log.Printf("Greeting: %s", r.Message)
  62. }
  63. func f(x int) (_, __ int) {
  64. _, __ = x, x
  65. return
  66. }

十七、go调用tol物联网

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. mqtt "github.com/eclipse/paho.mqtt.golang"
  7. )
  8. const broker = "tcp://你的服务器.mqtt.iot.gz.baidubce.com:1883"
  9. const username = "你的服务器/client"
  10. const password = "UJq**密码**msq"
  11. const ClientID = "随意"
  12. //message的回调
  13. var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  14. fmt.Printf("[%s] -> %s\n", msg.Topic(), msg.Payload())
  15. }
  16. var wg sync.WaitGroup
  17. var client mqtt.Client
  18. func main() {
  19. //连接MQTT服务器
  20. mqttConnect()
  21. defer client.Disconnect(250) //注册销毁
  22. wg.Add(1)
  23. go mqttSubScribe("test1")
  24. wg.Add(1)
  25. go testPublish()
  26. wg.Wait()
  27. }
  28. func mqttConnect() {
  29. //配置
  30. clinetOptions := mqtt.NewClientOptions().AddBroker(broker).SetUsername(username).SetPassword(password)
  31. clinetOptions.SetClientID(ClientID)
  32. clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
  33. //连接
  34. client = mqtt.NewClient(clinetOptions)
  35. //客户端连接判断
  36. if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
  37. panic(token.Error())
  38. }
  39. }
  40. func mqttSubScribe(topic string) {
  41. defer wg.Done()
  42. for {
  43. token := client.Subscribe(topic, 1, onMessage)
  44. token.Wait()
  45. }
  46. }
  47. //测试 3秒发送一次,然后自己接收
  48. func testPublish() {
  49. defer wg.Done()
  50. for {
  51. client.Publish("test1", 1, false, "TEST")
  52. time.Sleep(time.Duration(3) * time.Second)
  53. }
  54. }
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "time"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. )
  11. var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  12. fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
  13. }
  14. var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
  15. fmt.Println("Connected")
  16. }
  17. var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
  18. fmt.Printf("Connect lost: %v", err)
  19. }
  20. //https://cloud.emqx.cn/console/deployments/0?oper=new
  21. // ClientOptions:用于设置 broker,端口,客户端 id ,用户名密码等选项
  22. // messagePubHandler:全局 MQTT pub 消息处理
  23. // connectHandler:连接的回调
  24. // connectLostHandler:连接丢失的回调
  25. func main() {
  26. var broker = "broker.emqx.io"
  27. var port = 1883
  28. opts := mqtt.NewClientOptions()
  29. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
  30. opts.SetClientID("go_mqtt_client")
  31. opts.SetUsername("emqx")
  32. opts.SetPassword("public")
  33. opts.SetDefaultPublishHandler(messagePubHandler)
  34. opts.OnConnect = connectHandler
  35. opts.OnConnectionLost = connectLostHandler
  36. client := mqtt.NewClient(opts)
  37. if token := client.Connect(); token.Wait() && token.Error() != nil {
  38. panic(token.Error())
  39. }
  40. sub(client)
  41. publish(client)
  42. client.Disconnect(250)
  43. }
  44. //发布消息
  45. func publish(client mqtt.Client) {
  46. num := 10
  47. for i := 0; i < num; i++ {
  48. text := fmt.Sprintf("Message %d", i)
  49. token := client.Publish("topic/test", 0, false, text)
  50. token.Wait()
  51. time.Sleep(time.Second)
  52. }
  53. }
  54. //订阅
  55. func sub(client mqtt.Client) {
  56. topic := "topic/test"
  57. token := client.Subscribe(topic, 1, nil)
  58. token.Wait()
  59. fmt.Printf("Subscribed to topic: %s", topic)
  60. }
  61. //如果想使用 TLS 连接,可以如下设置:
  62. func NewTlsConfig() *tls.Config {
  63. certpool := x509.NewCertPool()
  64. ca, err := ioutil.ReadFile("ca.pem")
  65. if err != nil {
  66. log.Fatalln(err.Error())
  67. }
  68. certpool.AppendCertsFromPEM(ca)
  69. // Import client certificate/key pair
  70. clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
  71. if err != nil {
  72. panic(err)
  73. }
  74. return &tls.Config{
  75. RootCAs: certpool,
  76. ClientAuth: tls.NoClientCert,
  77. ClientCAs: nil,
  78. InsecureSkipVerify: true,
  79. Certificates: []tls.Certificate{clientKeyPair},
  80. }
  81. }
  82. //如果不设置客户端证书,可以如下设置:
  83. func NewTlsConfigs() *tls.Config {
  84. certpool := x509.NewCertPool()
  85. ca, err := ioutil.ReadFile("ca.pem")
  86. if err != nil {
  87. log.Fatalln(err.Error())
  88. }
  89. certpool.AppendCertsFromPEM(ca)
  90. return &tls.Config{
  91. RootCAs: certpool,
  92. ClientAuth: tls.NoClientCert,
  93. ClientCAs: nil,
  94. InsecureSkipVerify: true,
  95. }
  96. }

十八、乐观锁 参考:https://github.com/crossoverJie/gorm-optimistic

Quick start

  1. func BenchmarkUpdateWithOptimistic(b *testing.B) {
  2. dsn := "root:abc123@/test?charset=utf8&parseTime=True&loc=Local"
  3. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  4. if err != nil {
  5. fmt.Println(err)
  6. return
  7. }
  8. b.RunParallel(func(pb *testing.PB) {
  9. var out Optimistic
  10. db.First(&out, Optimistic{Id: 1})
  11. out.Amount = out.Amount + 10
  12. err = UpdateWithOptimistic(db, &out, func(model Lock) Lock {
  13. bizModel := model.(*Optimistic)
  14. bizModel.Amount = bizModel.Amount + 10
  15. return bizModel
  16. }, 5, 0)
  17. if err != nil {
  18. fmt.Println(err)
  19. }
  20. })
  21. }

Model

  1. type Optimistic struct {
  2. Id int64 `gorm:"column:id;primary_key;AUTO_INCREMENT" json:"id"`
  3. UserId string `gorm:"column:user_id;default:0;NOT NULL" json:"user_id"` // 用户ID
  4. Amount float32 `gorm:"column:amount;NOT NULL" json:"amount"` // 金额
  5. Version int64 `gorm:"column:version;default:0;NOT NULL" json:"version"` // 版本
  6. }
  7. func (o *Optimistic) TableName() string {
  8. return "t_optimistic"
  9. }
  10. func (o *Optimistic) GetVersion() int64 {
  11. return o.Version
  12. }
  13. func (o *Optimistic) SetVersion(version int64) {
  14. o.Version = version
  15. }

十九、并发条件下的有序输出

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. q := make(chan int)
  9. wg := new(sync.WaitGroup)
  10. wg.Add(1)
  11. go func(wg *sync.WaitGroup, q chan<- int) {
  12. defer wg.Done()
  13. defer close(q)
  14. for i := 1; i <= 100; i++ {
  15. q <- i
  16. time.Sleep(time.Microsecond) // 利用休眠实现对生产者的控制,但这种方案不可靠
  17. }
  18. }(wg, q)
  19. for i := 0; i < 3; i++ {
  20. wg.Add(1)
  21. go func(wg *sync.WaitGroup, q <-chan int) {
  22. defer wg.Done()
  23. for {
  24. if v, ok := <-q; ok {
  25. fmt.Println(v)
  26. } else {
  27. return
  28. }
  29. }
  30. }(wg, q)
  31. }
  32. wg.Wait()
  33. }
  34. // 利用WaitGroup实现安全退出,使生产者休眠来进行控制,但是如果消费者的操作是耗时是不可靠谱的,就会变得不可靠
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func print(ch <-chan int){
  7. for{
  8. fmt.Println(<-ch)
  9. }
  10. }
  11. func main() {
  12. ch0 :=make(chan int)
  13. ch1 :=make(chan int)
  14. ch2 :=make(chan int)
  15. go print(ch0)
  16. go print(ch1)
  17. go print(ch2)
  18. for i:=1; i<101; i++{
  19. switch{
  20. case i%3 == 0:
  21. ch0 <- i
  22. case i%3 == 1:
  23. ch1 <- i
  24. case i%3 == 2:
  25. ch2 <-i
  26. }
  27. time.Sleep(time.Microsecond)
  28. }
  29. }
  30. // 利用for循环实现安全退出,同样是使生产者休眠来进行控制,但是如果消费者的操作是耗时是不可靠谱的,就会变得不可靠
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. q := make(chan int,100)
  9. // 填充管道
  10. for i := 1; i <= 100; i++ {
  11. q<-i
  12. }
  13. cond := sync.NewCond(&sync.Mutex{})
  14. for i := 0; i < 3; i++ {
  15. go func(c *sync.Cond, q <-chan int) {
  16. for {
  17. c.L.Lock()
  18. c.Wait()
  19. if v, ok := <-q; ok {
  20. fmt.Println(v)
  21. c.L.Unlock()
  22. } else {
  23. c.L.Unlock()
  24. return
  25. }
  26. }
  27. }(cond, q)
  28. }
  29. time.Sleep(time.Second) // 使所有的goroutine进入等待状态
  30. for i := 1; i <= 100; i++ {
  31. cond.Signal() // 唤醒一个等待状态的goroutine
  32. time.Sleep(time.Microsecond)
  33. }
  34. close(q)
  35. time.Sleep(time.Second)
  36. }
  37. // 利用cond是所有goroutine进入阻塞状态直到被唤醒来实现有序输出,
  38. cond和其他锁不一样,临界资源被锁之后goroutine不会进行轮询,而是等待调度,减少资源开销
  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func main() {
  8. q := make(chan int)
  9. wg := new(sync.Mutex)
  10. go func( q chan<- int) {
  11. defer close(q)
  12. for i := 1; i <= 100; i++ {
  13. q <- i
  14. }
  15. }(q)
  16. for i := 0; i < 3; i++ {
  17. go func(wg *sync.Mutex, q <-chan int) {
  18. for {
  19. wg.Lock()
  20. if v, ok := <-q; ok {
  21. fmt.Println(v)
  22. wg.Unlock()
  23. } else {
  24. wg.Unlock()
  25. return
  26. }
  27. }
  28. }(wg, q)
  29. }
  30. time.Sleep(time.Second) // 安全退出
  31. }
  32. // 使用互斥锁阻塞其他协程来实现抢占,使用休眠实现安全退出,
  33. 这种方案是可靠的,但最好能够使用WaitGroup来实现安全退出
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. a := 0
  8. fn := func() int {
  9. a++
  10. return a
  11. }
  12. ch := make(chan int, 1)
  13. defer close(ch)
  14. for i := 0; i < 3; i++ {
  15. go func() {
  16. for {
  17. ch <- 1
  18. n := fn()
  19. if n > 100 {
  20. break
  21. }
  22. fmt.Println(n)
  23. <-ch
  24. }
  25. }()
  26. }
  27. time.Sleep(time.Second) // 安全退出
  28. }
  29. // 使用管道来塞其他协程来实现抢占,使用休眠实现安全退出
  30. ,这种方案是可靠的,同理,最好能够使用WaitGroup来实现安全退出

go.mod

  1. module datacenter
  2. go 1.15
  3. require (
  4. github.com/Jeffail/tunny v0.0.0-20181108205650-4921fff29480
  5. github.com/brianvoe/gofakeit/v6 v6.0.0
  6. github.com/chanxuehong/wechat v0.0.0-20201110083048-0180211b69fd
  7. github.com/coreos/go-semver v0.3.0 // indirect
  8. github.com/dgrijalva/jwt-go v3.2.0+incompatible
  9. github.com/fortytw2/leaktest v1.3.0 // indirect
  10. github.com/gin-gonic/gin v1.6.3
  11. github.com/go-playground/validator/v10 v10.4.1 // indirect
  12. github.com/go-redis/redis v6.15.9+incompatible
  13. github.com/go-sql-driver/mysql v1.5.0
  14. github.com/gobwas/httphead v0.1.0 // indirect
  15. github.com/gobwas/pool v0.2.1 // indirect
  16. github.com/gobwas/ws v1.0.3
  17. github.com/golang/protobuf v1.4.2
  18. github.com/gotomicro/ego v0.3.7
  19. github.com/jmoiron/sqlx v1.3.1
  20. github.com/jolestar/go-commons-pool v2.0.0+incompatible
  21. github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect
  22. github.com/nats-io/nats-server/v2 v2.1.7 // indirect
  23. github.com/nats-io/nats.go v1.10.0
  24. github.com/pborman/uuid v1.2.1
  25. github.com/qiniu/api.v7/v7 v7.7.0
  26. github.com/spf13/pflag v1.0.5 // indirect
  27. github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
  28. github.com/tal-tech/go-zero v1.0.28
  29. github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
  30. golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
  31. golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
  32. golang.org/x/time v0.0.0-20191024005414-555d28b269f0
  33. google.golang.org/grpc v1.29.1
  34. google.golang.org/protobuf v1.25.0
  35. github.com/eclipse/paho.mqtt.golang v1.2.0
  36. )