与HTTP消息推送不同的是,MQ消息,是我们主动去获取的
优点:
- 不需要公网IP
- CTWing提供消息缓存(1G),不会因为网络等因素丢数据
- 具有削峰去谷的作用,可承载消息量更大
注意:
- 接收程序启动,对相关定制主题进行一次消费后,该主题消息缓存才能生效
- 用户定制好主题后,应尽快启动接收程序,防止数据丢失。
- 目前每个租户最多能创建10个主题
开发
前期工作
控制台 > MQ消息推送 > 添加topic
- 一个topic提供1G的缓存空间
点击数据源
说明:
- 全部消息:该租户下的所有类型的消息都推送到该主题
- 按规则选择:按需求选择所要配置的产品以及对应的消息类型
编写代码
- 下载MQ消息推送C#.rar(右键->另存为),解压。包中主要包括动态链接库(MQDLL)、demo程序
- 创建一个C#控制台项目,引用MQDLL.dll
- 在NuGet中添加Pulsar.Client程序包(官方例子使用的是2.6.2,也下载这个版本!)
- 编写代码 ```csharp using MQDLL; using System; using System.Collections.Generic; using System.IO; using System.Threading;
namespace MQTest
{
public class CTWingMQ
{
//消息服务server地址
private static string server = “msgpush.ctwing.cn:16651”;
//租户ID
private static string tenantId = “2**6”;
//MQ推送的用户认证token
private static string token = “ey**T9Up4gc”;
//证书文件路径,已内置,无需调整
private static string certFilePath = “”;
//主题
private static List
public CTWingMQ()
{
consumer = new MqMsgConsumer();
IMsgListener msgListener = new MsgListener();
consumer.init(server, tenantId, token, certFilePath, topicNames, msgListener);
}
~CTWingMQ()
{
consumer.destroy();
}
public void start()
{
try
{
consumer.start(); //内部是多线程
}
catch (Exception ex)
{
Console.WriteLine("[Exception]");
Console.WriteLine(ex.Message);
}
}
class MsgListener : IMsgListener
{
private static string outDir = "./out";
public MsgListener()
{
if (!Directory.Exists(outDir))
{
Directory.CreateDirectory(outDir);
}
}
public void onMessage(string msg)
{
//保存时的时间
long timeStamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
StreamWriter sw = File.CreateText(outDir + "/" + timeStamp + ".json");
sw.Write(msg);
sw.Flush();
sw.Close();
Console.WriteLine("[" + timeStamp + "]\t" + msg);
}
}
}
class Program
{
static void Main(string[] args)
{
CTWingMQ tool = new CTWingMQ();
tool.start(); //内部开了多线程
Console.WriteLine("正在监听,按任意键退出。。。");
//等待(堵塞主线程)
Console.ReadKey();
}
}
}
```
测试
缓存功能
先不运行代码,模拟设备上线,发送5条信息
在MQ消息推送中,可以看到有一些数据被缓存下来了
运行代码,成功获得刚才发送的5条数据
遇到一种情况:程序没有运行,已经有缓存
- 程序运行时,取不到缓存里的东西
- 当终端再发一条数据之后,程序才能取到缓存里的数据
实时监听
运行程序,进行实时监听。
在线模拟,上报一条数据