Java SpringCloudSentinel
最近看了下关于分布式限流的部分,看到Sentinel的分布式限流,也就是集群限流的部分,想搭个环境看看,结果发现网上关于这方面的内容基本可以说没有,甚至很难跑起来他的demo,就算能跑起来,估计也得自己研究半天,麻烦的要死。
很重要的原因可能就是Sentinel关于这块做的并不完善,而且从官方的Issue中能看出来,其实官方对于这块后续并没有计划去做的更好。
在此之前,肯定要先说下关于Sentinel集群限流方面的原理,没有原理一切都是空中楼阁。

集群限流原理

原理这方面比较好解释,就是在原本的限流规则中加了一个clusterMode参数,如果是true的话,那么会走集群限流的模式,反之就是单机限流。
如果是集群限流,判断身份是限流客户端还是限流服务端,客户端则和服务端建立通信,所有的限流都通过和服务端的交互来达到效果。
对于Sentinel集群限流,包含两种模式,内嵌式和独立式。

内嵌式

什么是内嵌式呢,简单来说,要限流那么必然要有个服务端去处理多个客户端的限流请求,对于内嵌式来说呢,就是整个微服务集群内部选择一台机器节点作为限流服务端(Sentinel把这个叫做token-server),其他的微服务机器节点作为限流的客户端(token-client),这样的做法有缺点也有优点。
限流-嵌入式
首先说优点:这种方式部署不需要独立部署限流服务端节省独立部署服务端产生的额外服务器开支降低部署和维护复杂度
再说缺点,缺点的话也可以说是整个Sentinel在集群限流这方面做得不够好的问题。
先说第一个缺点:无自动故障转移机制
无论是内嵌式还是独立式的部署方案,都无法做到自动的故障转移。
所有的server和client都需要事先知道IP的请求下做出配置,如果server挂了,需要手动的修改配置,否则集群限流会退化成单机限流。
比如交易服务有3台机器A\B\C,其中A被手动设置为server,B\C则是作为client,当A服务器宕机之后,需要手动修改B\C中一台作为server,否则整个集群的机器都将退化回单机限流的模式。
但是,如果client挂了,则是不会影响到整个集群限流的,比如B挂了,那么A和C将会继续组成集群限流。
如果B再次重启成功,那么又会重新加入到整个集群限流当中来,因为会有一个自动重连的机制,默认的时间是N2秒,逐渐递增的一个时间。
这是想用Sentinel做集群限流并且使用内嵌式需要考虑的问题,要自己去实现自动故障转移的机制,当然,server节点选举也要自己实现了。
对于这个问题,官方提供了可以修改server/client的API接口,另外一个就是可以基于动态的数据源配置方式,这个后面再谈。
第二个缺点:适用于单微服务集群内部限流
这个其实也是显而易见的道理,都内部选举一台作为server去限流了,如果还跨多个微服务的话,显然是不太合理的行为,现实中这种情况肯定也是非常少见的了,当然非要想跨多个微服务集群也不是不可以。
第三个缺点:*server节点的机器性能会受到一定程度的影响

这个肯定也比较好理解的,作为server去限流,那么其他的客户端肯定要和server去通信才能做到集群限流,对不对,所以一定程度上肯定会影响到server节点本身服务的性能,但是应该问题不大,就当server节点多了一个流量比较大的接口好了。
具体上会有多大的影响,没有实际对这块做出实际的测试,如果真的流量非常大,需要实际测试一下这方面的问题。
影响还是可控的,本身server和client基于netty通信,通信的内容其实也非常的小。

独立式

说完内嵌式的这些点,然后再说独立式,也非常好理解,就是单独部署一台机器作为限流服务端server,就不在本身微服务集群内部选一台作为server了。
限流-独立式
很明显,优点就是解决了上面的缺点。

  1. 不会和内嵌式一样,影响到server节点的本身性能
  2. 可以适用于跨多个微服务之间的集群限流

优点可以说就是解决了内嵌式的两个缺点,那么缺点也来了,这同样也是Sentinel本身并没有去解决的问题。
缺点一:需要独立部署,会产生额外的资源(钱)和运维复杂度
缺点二:server默认是单机,需要自己实现高可用方案
缺点二很致命,官方的server实现默认就是单机的,单点问题大家懂的都懂,需要自己实现高可用。
这么说Sentinel这个集群限流就是简单的实现了一下,真正复杂的部分他都没管,可以这么理解。

run起来

那基本原理大概了解之后,还是要真正跑起来看看效果的,毕竟开头就说了,网上这方面真的是感觉啥也搜不到,下面以嵌入式集群的方式举例。
无论集群限流还是单机限流的方式,官方都支持写死配置和动态数据源的配置方式,写的话下面的代码中也都有,被注释掉了,至于动态数据源的配置,会基于Apollo来实现。
理解一下动态数据源的配置方式,基于这个可以实现限流规则的动态刷新,还有重点的一点可以做到基于修改配置方式的半自动故障转移。
动态数据源支持推和拉两种方式,比如文件系统和Eureka就是拉取的方式,定时读取文件内容的变更,Eureka则是建立HTTP连接,定时获取元数据的变更。
推送的方式主要是基于事件监听机制,比如Apollo和Nacos,Redis官方则是基于Pub/Sub来实现,默认的实现方式是基于Lettuce,如果想用其他的客户端要自己实现。
限流-集群工作模式
首先,该引入的包还是引入。

  1. <dependency>
  2. <groupId>com.alibaba.csp</groupId>
  3. <artifactId>sentinel-annotation-aspectj</artifactId>
  4. <version>1.8.4</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba.csp</groupId>
  8. <artifactId>sentinel-transport-simple-http</artifactId>
  9. <version>1.8.4</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.alibaba.csp</groupId>
  13. <artifactId>sentinel-cluster-client-default</artifactId>
  14. <version>1.8.4</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>com.alibaba.csp</groupId>
  18. <artifactId>sentinel-cluster-server-default</artifactId>
  19. <version>1.8.4</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.alibaba.csp</groupId>
  23. <artifactId>sentinel-datasource-apollo</artifactId>
  24. <version>1.8.4</version>
  25. </dependency>

实现SPI,在resources目录的META-INF/services下新增名为com.alibaba.csp.sentinel.init.InitFunc的文件,内容写上自己实现的类名,比如com.fcant.demo.init.DemoClusterInitFunc。
Sentinel集群限流探索 - 图4
实现InitFunc接口,重写init方法,代码直接贴出来,这里整体依赖的是Apollo的配置方式,注释的部分是在测试的时候写死代码的配置方式,也是可以用的。

  1. public class DemoClusterInitFunc implements InitFunc {
  2. private final String namespace = "application";
  3. private final String ruleKey = "demo_sentinel";
  4. private final String ruleServerKey = "demo_cluster";
  5. private final String defaultRuleValue = "[]";
  6. @Override
  7. public void init() throws Exception {
  8. // 初始化 限流规则
  9. initDynamicRuleProperty();
  10. //初始化 客户端配置
  11. initClientConfigProperty();
  12. // 初始化 服务端配置信息
  13. initClientServerAssignProperty();
  14. registerClusterRuleSupplier();
  15. // token-server的传输规则
  16. initServerTransportConfigProperty();
  17. // 初始化 客户端和服务端状态
  18. initStateProperty();
  19. }
  20. /**
  21. * 限流规则和热点限流规则配置
  22. */
  23. private void initDynamicRuleProperty() {
  24. ReadableDataSource<String, List<FlowRule>> ruleSource = new ApolloDataSource<>(namespace, ruleKey,
  25. defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
  26. }));
  27. FlowRuleManager.register2Property(ruleSource.getProperty());
  28. ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new ApolloDataSource<>(namespace, ruleKey,
  29. defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
  30. }));
  31. ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
  32. }
  33. /**
  34. * 客户端配置,注释的部分是通过Apollo配置,只有一个配置就省略了
  35. */
  36. private void initClientConfigProperty() {
  37. // ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(namespace, ruleKey,
  38. // defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {
  39. // }));
  40. // ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
  41. ClusterClientConfig clientConfig = new ClusterClientConfig();
  42. clientConfig.setRequestTimeout(1000);
  43. ClusterClientConfigManager.applyNewConfig(clientConfig);
  44. }
  45. /**
  46. * client->server 传输配置,设置端口号,注释的部分是写死的配置方式
  47. */
  48. private void initServerTransportConfigProperty() {
  49. ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(namespace, ruleServerKey,
  50. defaultRuleValue, source -> {
  51. List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
  52. });
  53. ServerTransportConfig serverTransportConfig = Optional.ofNullable(groupList)
  54. .flatMap(this::extractServerTransportConfig)
  55. .orElse(null);
  56. return serverTransportConfig;
  57. });
  58. ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
  59. // ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig().setIdleSeconds(600).setPort(transPort));
  60. }
  61. private void registerClusterRuleSupplier() {
  62. ClusterFlowRuleManager.setPropertySupplier(namespace -> {
  63. ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
  64. defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
  65. }));
  66. return ds.getProperty();
  67. });
  68. ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
  69. ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
  70. defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
  71. }));
  72. return ds.getProperty();
  73. });
  74. }
  75. /**
  76. * 服务端配置,设置server端口和IP,注释的配置是写死的方式,这个在服务端是不用配置的,只有客户端需要配置用来连接服务端
  77. */
  78. private void initClientServerAssignProperty() {
  79. ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(namespace, ruleServerKey,
  80. defaultRuleValue, source -> {
  81. List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
  82. });
  83. ClusterClientAssignConfig clusterClientAssignConfig = Optional.ofNullable(groupList)
  84. .flatMap(this::extractClientAssignment)
  85. .orElse(null);
  86. return clusterClientAssignConfig;
  87. });
  88. ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
  89. // ClusterClientAssignConfig serverConfig = new ClusterClientAssignConfig();
  90. // serverConfig.setServerHost("127.0.0.1");
  91. // serverConfig.setServerPort(transPort);
  92. // ConfigSupplierRegistry.setNamespaceSupplier(() -> "trade-center");
  93. // ClusterClientConfigManager.applyNewAssignConfig(serverConfig);
  94. }
  95. private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupEntity> groupList) {
  96. ClusterGroupEntity tokenServer = groupList.stream().filter(x -> x.getState().equals(ClusterStateManager.CLUSTER_SERVER)).findFirst().get();
  97. Integer currentMachineState = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
  98. if (currentMachineState.equals(ClusterStateManager.CLUSTER_CLIENT)) {
  99. String ip = tokenServer.getIp();
  100. Integer port = tokenServer.getPort();
  101. return Optional.of(new ClusterClientAssignConfig(ip, port));
  102. }
  103. return Optional.empty();
  104. }
  105. /**
  106. * 初始化客户端和服务端状态,注释的也是写死的配置方式
  107. */
  108. private void initStateProperty() {
  109. ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(namespace, ruleServerKey,
  110. defaultRuleValue, source -> {
  111. List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
  112. });
  113. Integer state = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
  114. return state;
  115. });
  116. ClusterStateManager.registerProperty(clusterModeDs.getProperty());
  117. // ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER);
  118. }
  119. private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
  120. return groupList.stream()
  121. .filter(x -> x.getMachineId().equalsIgnoreCase(getCurrentMachineId()) && x.getState().equals(ClusterStateManager.CLUSTER_SERVER))
  122. .findAny()
  123. .map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
  124. }
  125. private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
  126. return getCurrentMachineId().equals(group.getMachineId());
  127. }
  128. private String getCurrentMachineId() {
  129. // 通过-Dcsp.sentinel.api.port=8719 配置, 默认8719,随后递增
  130. return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
  131. }
  132. private static final String SEPARATOR = "@";
  133. }

基础类,定义配置的基础信息。

  1. @Data
  2. public class ClusterGroupEntity {
  3. private String machineId;
  4. private String ip;
  5. private Integer port;
  6. private Integer state;
  7. }

然后是Apollo中的限流规则的配置和server/client集群关系的配置。
需要说明一下的就是flowId,这个是区分限流规则的全局唯一ID,必须要有,否则集群限流会有问题。
thresholdType代表限流模式,默认是0,代表单机均摊,比如这里count限流QPS=20,有3台机器,那么集群限流阈值就是60,如果是1代表全局阈值,也就是count配置的值就是集群限流的上限。

  1. demo_sentinel=[
  2. {
  3. "resource": "test_res", //限流资源名
  4. "count": 20, //集群限流QPS
  5. "clusterMode": true, //true为集群限流模式
  6. "clusterConfig": {
  7. "flowId": 111, //这个必须得有,否则会有问题
  8. "thresholdType": 1 //限流模式,默认为0单机均摊,1是全局阈值
  9. }
  10. }
  11. ]
  12. demo_cluster=[
  13. {
  14. "ip": "192.168.3.20",
  15. "machineId": "192.168.3.20@8720",
  16. "port": 9999, //server和client通信接口
  17. "state": 1 //指定为server
  18. },
  19. {
  20. "ip": "192.168.3.20",
  21. "machineId": "192.168.3.20@8721",
  22. "state": 0
  23. },
  24. {
  25. "ip": "192.168.3.20",
  26. "machineId": "192.168.3.20@8722",
  27. "state": 0
  28. }
  29. ]

OK,到这里代码和配置都已经OK,还需要跑起来Sentinel控制台,这个不用教,还有启动参数。
本地可以直接跑多个客户端,注意修改端口号:-Dserver.port=9100 -Dcsp.sentinel.api.port=8720这两个一块改,至于怎么连Apollo这块就省略了,不行的话用代码里的写死的方式也可以用。

  1. -Dserver.port=9100 -Dcsp.sentinel.api.port=8720 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.log.use.pid=true

因为有流量之后控制台才能看到限流的情况,所以用官方给的限流测试代码修改一下,放到Springboot启动类中,触发限流规则的初始化。

  1. @SpringBootApplication
  2. public class DemoApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(DemoApplication.class, args);
  5. new FlowQpsDemo();
  6. }
  7. }

测试限流代码:

  1. public class FlowQpsDemo {
  2. private static final String KEY = "test_res";
  3. private static AtomicInteger pass = new AtomicInteger();
  4. private static AtomicInteger block = new AtomicInteger();
  5. private static AtomicInteger total = new AtomicInteger();
  6. private static volatile boolean stop = false;
  7. private static final int threadCount = 32;
  8. private static int seconds = 60 + 40;
  9. public FlowQpsDemo() {
  10. tick();
  11. simulateTraffic();
  12. }
  13. private static void simulateTraffic() {
  14. for (int i = 0; i < threadCount; i++) {
  15. Thread t = new Thread(new RunTask());
  16. t.setName("simulate-traffic-Task");
  17. t.start();
  18. }
  19. }
  20. private static void tick() {
  21. Thread timer = new Thread(new TimerTask());
  22. timer.setName("sentinel-timer-task");
  23. timer.start();
  24. }
  25. static class TimerTask implements Runnable {
  26. @Override
  27. public void run() {
  28. long start = System.currentTimeMillis();
  29. System.out.println("begin to statistic!!!");
  30. long oldTotal = 0;
  31. long oldPass = 0;
  32. long oldBlock = 0;
  33. while (!stop) {
  34. try {
  35. TimeUnit.SECONDS.sleep(1);
  36. } catch (InterruptedException e) {
  37. }
  38. long globalTotal = total.get();
  39. long oneSecondTotal = globalTotal - oldTotal;
  40. oldTotal = globalTotal;
  41. long globalPass = pass.get();
  42. long oneSecondPass = globalPass - oldPass;
  43. oldPass = globalPass;
  44. long globalBlock = block.get();
  45. long oneSecondBlock = globalBlock - oldBlock;
  46. oldBlock = globalBlock;
  47. System.out.println(seconds + " send qps is: " + oneSecondTotal);
  48. System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
  49. + ", pass:" + oneSecondPass
  50. + ", block:" + oneSecondBlock);
  51. if (seconds-- <= 0) {
  52. // stop = true;
  53. }
  54. }
  55. long cost = System.currentTimeMillis() - start;
  56. System.out.println("time cost: " + cost + " ms");
  57. System.out.println("total:" + total.get() + ", pass:" + pass.get()
  58. + ", block:" + block.get());
  59. System.exit(0);
  60. }
  61. }
  62. static class RunTask implements Runnable {
  63. @Override
  64. public void run() {
  65. while (!stop) {
  66. Entry entry = null;
  67. try {
  68. entry = SphU.entry(KEY);
  69. // token acquired, means pass
  70. pass.addAndGet(1);
  71. } catch (BlockException e1) {
  72. block.incrementAndGet();
  73. } catch (Exception e2) {
  74. // biz exception
  75. } finally {
  76. total.incrementAndGet();
  77. if (entry != null) {
  78. entry.exit();
  79. }
  80. }
  81. Random random2 = new Random();
  82. try {
  83. TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
  84. } catch (InterruptedException e) {
  85. // ignore
  86. }
  87. }
  88. }
  89. }
  90. }

启动之后查看控制台,可以看到嵌入式的集群服务端已经启动好。
Sentinel集群限流探索 - 图5
查看限流的情况:
Sentinel集群限流探索 - 图6
最后为了测试效果,再启动一个客户端,修改端口号为9200和8721,可以看到新的客户端已经连接到了服务端,不过这里显示的总QPS 30000和配置的不符,这个不用管他。
Sentinel集群限流探索 - 图7
Sentinel集群限流探索 - 图8
好了,这个就是集群限流原理和使用配置方式,当然了,可以启动多台服务,然后手动修改Apollo中的state参数修改服务端,验证修改配置的方式是否能实现故障转移机制,另外就是关闭client或者server验证是否回退到单机限流的情况,这里就不一一测试了。
对于独立式的部署方式基本也是一样的,只是单独启动一个服务端的服务,需要手动配置server,而嵌入式的则不需要,loadServerNamespaceSet配置为自己的服务名称即可。

  1. ClusterTokenServer tokenServer = new SentinelDefaultTokenServer();
  2. ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig()
  3. .setIdleSeconds(600)
  4. .setPort(11111));
  5. ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton(DemoConstants.APP_NAME));
  6. tokenServer.start();