官方文档:https://www.ctwing.cn/dyts/104#see

与HTTP消息推送不同的是,MQ消息,是我们主动去获取的
image.png
优点:

  • 不需要公网IP
  • CTWing提供消息缓存(1G),不会因为网络等因素丢数据
  • 具有削峰去谷的作用,可承载消息量更大

注意:

  1. 接收程序启动,对相关定制主题进行一次消费后,该主题消息缓存才能生效
  2. 用户定制好主题后,应尽快启动接收程序,防止数据丢失。
  3. 目前每个租户最多能创建10个主题

开发

前期工作

控制台 > MQ消息推送 > 添加topic

  1. 一个topic提供1G的缓存空间

image.png

点击数据源
image.png

image.png
说明:

  1. 全部消息:该租户下的所有类型的消息都推送到该主题
  2. 按规则选择:按需求选择所要配置的产品以及对应的消息类型

编写代码

  1. 下载MQ消息推送C#.rar(右键->另存为),解压。包中主要包括动态链接库(MQDLL)、demo程序
  2. 创建一个C#控制台项目,引用MQDLL.dll

image.png

  1. 在NuGet中添加Pulsar.Client程序包(官方例子使用的是2.6.2,也下载这个版本!)
  2. 编写代码 ```csharp using MQDLL; using System; using System.Collections.Generic; using System.IO; using System.Threading;

namespace MQTest { public class CTWingMQ { //消息服务server地址 private static string server = “msgpush.ctwing.cn:16651”; //租户ID private static string tenantId = “2**6”; //MQ推送的用户认证token private static string token = “ey**T9Up4gc”; //证书文件路径,已内置,无需调整 private static string certFilePath = “”; //主题 private static List topicNames = new List() { “datachange” }; //massage consumer IMsgConsumer consumer;

  1. public CTWingMQ()
  2. {
  3. consumer = new MqMsgConsumer();
  4. IMsgListener msgListener = new MsgListener();
  5. consumer.init(server, tenantId, token, certFilePath, topicNames, msgListener);
  6. }
  7. ~CTWingMQ()
  8. {
  9. consumer.destroy();
  10. }
  11. public void start()
  12. {
  13. try
  14. {
  15. consumer.start(); //内部是多线程
  16. }
  17. catch (Exception ex)
  18. {
  19. Console.WriteLine("[Exception]");
  20. Console.WriteLine(ex.Message);
  21. }
  22. }
  23. class MsgListener : IMsgListener
  24. {
  25. private static string outDir = "./out";
  26. public MsgListener()
  27. {
  28. if (!Directory.Exists(outDir))
  29. {
  30. Directory.CreateDirectory(outDir);
  31. }
  32. }
  33. public void onMessage(string msg)
  34. {
  35. //保存时的时间
  36. long timeStamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
  37. StreamWriter sw = File.CreateText(outDir + "/" + timeStamp + ".json");
  38. sw.Write(msg);
  39. sw.Flush();
  40. sw.Close();
  41. Console.WriteLine("[" + timeStamp + "]\t" + msg);
  42. }
  43. }
  44. }
  45. class Program
  46. {
  47. static void Main(string[] args)
  48. {
  49. CTWingMQ tool = new CTWingMQ();
  50. tool.start(); //内部开了多线程
  51. Console.WriteLine("正在监听,按任意键退出。。。");
  52. //等待(堵塞主线程)
  53. Console.ReadKey();
  54. }
  55. }

}

```

测试

缓存功能

先不运行代码,模拟设备上线,发送5条信息
image.png
image.png

在MQ消息推送中,可以看到有一些数据被缓存下来了
image.png
运行代码,成功获得刚才发送的5条数据
image.png

遇到一种情况:程序没有运行,已经有缓存

  1. 程序运行时,取不到缓存里的东西
  2. 当终端再发一条数据之后,程序才能取到缓存里的数据

实时监听

运行程序,进行实时监听。
在线模拟,上报一条数据
image.png