1、设计一个通用的短信服务平台,需要哪些考虑点(难点)?

1、短信模板和签名如何申请: 需要事先在三方平台开通短信账号,在平台中申请签名和模板,通过审核后才可使用 ;
2、在高并发情况下如果保证系统的稳定性(高可用):
搭建短信服务集群,提升服务的的可用性;
使用spring-cloud-stream-rabbitMQ来实现异步发送【流量削峰】来保证其发送稳定性 ;
3、后续对接新渠道的扩展性(高可扩): 需要使用工厂加代理模式构建易于扩展的平台 ;
4、短信发送失败的补发机制(集群容错): 如果三方短信平台欠费或者其他原因导致短信失败后,我们应自动切换其他短信平台继续发送;
5、负载均衡:集群解决负载均衡,算法是由我们自己实现,轮询、随机、一致性hash算法等;
6、渠道路由:类似支付,我们采用适配器设计模式,初始化各个平台实现类到IOC容器中,并存储到一个Map中,根据渠道类型,获取对应的实现类;

2、发送短信的业务流程

1ebe664f6f8e9663f4ba40803a87651.png
image.png
具体业务流程:
1、业务系统调用短信业务系统,此时直接返回发送结果
2、短信业务系统生产发送短信消息到rabbitMQ,此处是做的削峰操作,防止短信发送量过大
3、短信监听从rabbitMQ中拉取短信消息
4、调用SmsSendAdapter适配器过滤黑名单
5、通过负载均衡选择对应渠道
6、查询渠道消息,选择模板
7、查询签名消息
8、通过模板变量兑换参数
9、推送短信消息到三方平台
10、轮询三方平台查看短信发送结果
11、短信业务平台同步发送结果

3、自定义负载均衡算法,及优化过程,以(权重)轮询算法为例(重点)

方案1:
1、获取短信模板对应的渠道及权重比例;
2、使用synchronized同步锁加锁;
3、以频道列表的长度为判断,当索引大于等于集合长度时,重置索引为0;否则,获取当前渠道名称,索引自增
缺点:synchronized同步锁加锁性能较差;

优化1:
采用java.util.concurrent.atomic.AtomicInteger(juc)解决线程安全问题;AtomicInterger基于CAS算法实现占用cpu,使用线程调度解决。使用AtomicInteger.incrementAndGet()替换pos++

优化2:
参考springcloud ribbon负载均衡算法,优化当前代码

  1. private int incrementAndGetModulo(int modulo) {
  2. for (;;) { //自旋锁
  3. int current = nextChannelCyclicCounter.get();
  4. int next = (current + 1) % modulo; // 避免数组越界
  5. if (nextChannelCyclicCounter.compareAndSet(current, next)) // CAS 思想
  6. return next;
  7. }
  8. }
  1. /**
  2. * @ClassName RoundRobin.java
  3. * @Description 轮询(Round Robin)法
  4. * 自定义负载均衡算法:
  5. * * 防止数组越界 (按位与, 对集合大小取模运算)
  6. * * 按照权重设置Map
  7. * * key: 渠道名称
  8. * * value: 权重
  9. * 思想: 取value权重的值,值对应的渠道在集合里添加对应的权重的数量
  10. * 如:
  11. * MAP: {"ali":2,"baidu":1,"tencent":"2"}
  12. * List: ["ali","ali","baidu","tencent","tencent"]
  13. * 在按照轮询/随机算法在获取对应集合下标的值即可
  14. */
  15. @Component
  16. public class RoundRobinSend extends BaseSendLoadBalancer {
  17. private static Integer pos = 0;
  18. @Override
  19. public String chooseChannel(List<SmsTemplate> SmsTemplates, Set<String> mobile) {
  20. //获得当前模板对应的渠道
  21. Map<String, String> channelMap = super.getChannelList(SmsTemplates);
  22. // 取得通道地址List
  23. Set<String> keySet = channelMap.keySet();
  24. ArrayList<String> keyList = new ArrayList<String>();
  25. keyList.addAll(keySet);
  26. String channelName = null;
  27. synchronized (pos) {
  28. if (pos >= keySet.size())
  29. pos = 0;
  30. channelName = keyList.get(pos);
  31. pos ++;
  32. }
  33. return channelName;
  34. }
  35. /**
  36. * 优化1: synchronized大量同步锁,性能不高,使用 AtomicLong对象解决线程安全问题
  37. * 存在的问题: AtomicInteger 基于CAS实现占用CPU, 使用线程调度解决
  38. */
  39. private static AtomicInteger pos = new AtomicInteger(0);
  40. @Override
  41. public String chooseChannel(List<SmsTemplate> smsTemplates, Set<String> mobile) {
  42. //获得当前模板对应的渠道 ALIYUN_SMS 2
  43. Map<String, String> channelMap = super.getChannelList(smsTemplates);
  44. // 取得通道地址List 3
  45. Set<String> keySet = channelMap.keySet();
  46. ArrayList<String> keyList = new ArrayList<String>(keySet);
  47. String channelName = null;
  48. if (pos.incrementAndGet() >= keySet.size())
  49. pos.set(0);
  50. channelName = keyList.get(pos.get());
  51. return channelName;
  52. }
  53. /**
  54. * 优化2: 参考SpringCloud Ribbon 负载均衡算法,优化当前代码
  55. */
  56. private AtomicInteger nextChannelCyclicCounter;
  57. public RoundRobinSend2() {
  58. nextChannelCyclicCounter = new AtomicInteger(0);
  59. }
  60. public String chooseChannel(List<SmsTemplate> smsTemplates, Set<String> mobile) {
  61. if (EmptyUtil.isNullOrEmpty(smsTemplates)) {
  62. log.warn("no load balancer");
  63. return null;
  64. }
  65. //获取所有的渠道列表
  66. Map<String, String> channelMap = super.getChannelList(smsTemplates);
  67. Set<String> keySet = channelMap.keySet();
  68. ArrayList<String> keyList = new ArrayList<>(keySet);
  69. String channelName = null;
  70. int count = 0;
  71. //循环10次后自动跳出循环
  72. while (channelName == null && count++ < 10) {
  73. // 得到所有的渠道集合大小
  74. int upCount = keyList.size();
  75. if (upCount == 0) {
  76. log.warn("No up servers available from load balancer: " + smsTemplates);
  77. return null;
  78. }
  79. //轮询算法实现
  80. int nextServerIndex = incrementAndGetModulo(upCount);
  81. channelName = keyList.get(nextServerIndex);
  82. if (channelName == null) {
  83. /* Transient. */
  84. Thread.yield();
  85. continue;
  86. }
  87. }
  88. if (count >= 10) {
  89. log.warn("No available alive servers after 10 tries from load balancer: ");
  90. }
  91. return channelName;
  92. }
  93. /**
  94. * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
  95. * @param modulo 集合大小
  96. * @return 获取集合中下标
  97. */
  98. private int incrementAndGetModulo(int modulo) {
  99. for (;;) { //自旋锁
  100. int current = nextChannelCyclicCounter.get();
  101. int next = (current + 1) % modulo; // 避免数组越界
  102. if (nextChannelCyclicCounter.compareAndSet(current, next)) // CAS 思想
  103. return next;
  104. }
  105. }
  106. }