主要是服务端订阅情况下怎么部署
从并发角度应该是rabbitmq消息队列方式部署的,因为智能设备总是在线的,对传统电商行业来讲是高并发场景,在电商中涉及到库存和订单重复支付等问题,对设备来讲存在控制命令结果是否正确的问题。

1-设备上报压力测试工具

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. mqtt "github.com/eclipse/paho.mqtt.golang"
  6. "sync"
  7. )
  8. var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  9. fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
  10. }
  11. var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
  12. fmt.Println("Connected")
  13. }
  14. var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
  15. fmt.Printf("Connect lost: %v", err)
  16. }
  17. func main() {
  18. var broker = "xxxxx.mqtt.iothub.aliyuncs.com"
  19. var port = 1883
  20. opts := mqtt.NewClientOptions()
  21. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
  22. opts.SetClientID(" ")
  23. opts.SetUsername(" ")
  24. opts.SetPassword("")
  25. opts.SetDefaultPublishHandler(messagePubHandler)
  26. opts.OnConnect = connectHandler
  27. opts.OnConnectionLost = connectLostHandler
  28. client := mqtt.NewClient(opts)
  29. if token := client.Connect(); token.Wait() && token.Error() != nil {
  30. panic(token.Error())
  31. }
  32. data := make(map[string]interface{})
  33. detail := make(map[string]interface{})
  34. detail["asset_id"] = "xxx"
  35. detail["event_time"] = "xxx"
  36. detail["client_sn"] = ""
  37. detail["result"] = "xxx"
  38. data["method"] = "thing.event.display_event.post"
  39. data["params"] = detail
  40. num := 2000
  41. dataStr, _ := json.Marshal(data)
  42. var wg sync.WaitGroup
  43. for i := 0; i < num; i++ {
  44. wg.Add(1)
  45. go func() {
  46. defer wg.Done()
  47. //qos=0不用等待mqtt server回复是否接收到 只管往里面推
  48. token := client.Publish("/sys/xxx/xxxx/thing/event/display_event/post", 0, false, dataStr)
  49. _ = token.Wait() // Can also use '<-t.Done()' in releases > 1.2.0
  50. if token.Error() != nil {
  51. fmt.Println(token.Error())
  52. } else {
  53. fmt.Println("发布消息成功!")
  54. }
  55. }()
  56. }
  57. wg.Wait()
  58. }

go模拟设备上报。利用go协程实现并发可以压力测试mqtt server或者模拟线上设备并发的脚本