NewLife.XCode是一个有15年历史的开源数据中间件,支持netcore/net45/net40,由新生命团队(2002~2020)开发完成并维护至今,以下简称XCode。
整个系列教程会大量结合示例代码和运行日志来进行深入分析,蕴含多年开发经验于其中,代表作有百亿级大数据实时计算项目。
开源地址:https://github.com/NewLifeX/X (求star, 1067+)

在大数据分析处理中,需要对海量数据进行添删改操作,常规单行操作难以满足要求,批量操作势在必行!
飞仙(http://feixian.newlifex.com/)有收藏各种数据库批量插入数据的性能排行榜,其中MySql冠军是60万tps,SQLite冠军是56.6万tps
然而很多时候,数据来自多个渠道(多线程、多网络连接),单个渠道数据量不大,甚至只有一行,就难以使用批量添删改操作了。例如物联网数据采集、埋点日志等,在多线程上有大量数据需要写入。因此,XCode创造性设计了实体队列技术

!!阅读本文之前,建议阅读
语雀内容

什么是实体队列

要说实体队列EntityDeferredQueue,就不得不提它的基类延迟队列DeferredQueue。

延迟队列DeferredQueue的核心思想就是“凑批”,把要处理的零散数据放入一个“队列”,然后定时集中处理
例如物联网采集服务端从多个连接收到数据,需要写入数据库,为了提升吞吐,可以把实体数据放入延迟队列,然后定时的落库,此时,延迟队列得到一批数据,可以使用批量插入技术。

实际上DeferredQueue内部并不是一个队列,而是一个并发字典,因为有些业务场景,需要在“入队列”时去重,例如统计数据,需要拿出某省份的统计数据,多次累加后集中保存。

  1. private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
  2. private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
  3. {
  4. var key = $"{date:yyMMdd}_{provinceID}_{kind}";
  5. var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());
  6. stat.StatDate = date;
  7. stat.Kind = kind;
  8. stat.ProvinceID = provinceID;
  9. stat.LastCode = code;
  10. stat.ProcessStat(scanKind);
  11. _statCache.Commit(key);
  12. }

主要流程
对于统计型数据来说,可以在内存里面多次累加计算指标,然后一次性保存,并且是批量保存,极大减少了数据库写入次数。这是大数据分析必备利器!

延迟队列主要属性

  1. /// <summary>跟踪数。达到该值时输出跟踪日志,默认1000</summary>
  2. public Int32 TraceCount { get; set; } = 1000;
  3. /// <summary>周期。默认10_000毫秒</summary>
  4. public Int32 Period { get; set; } = 10_000;
  5. /// <summary>最大个数。超过该个数时,进入队列将产生堵塞。默认100_000</summary>
  6. public Int32 MaxEntity { get; set; } = 100_000;
  7. /// <summary>批大小。默认5_000</summary>
  8. public Int32 BatchSize { get; set; } = 5_000;
  9. /// <summary>等待借出对象确认修改的时间,默认3000ms</summary>
  10. public Int32 WaitForBusy { get; set; } = 3_000;
  11. /// <summary>保存速度,每秒保存多少个实体</summary>
  12. public Int32 Speed { get; private set; }
  13. /// <summary>是否异步处理。默认true表示异步处理,共用DQ定时调度;false表示同步处理,独立线程</summary>
  14. public Boolean Async { get; set; } = true;

回过头来,实体队列EntityDeferredQueue作为延迟队列的扩展延伸,实际上是定义了“队列数据”的处理行为。延迟队列只负责收集数据和定时调度,实际处理行为Process需要扩展。

EntityDeferredQueue定义了 Save/Insert/Update/Upsert/Delete 等行为供选择。