Sentinel

简介

分布式系统流量控制框架
分布式系统会存在很多应用,应用之间调用链路很长。如果突发大流量访问,某个服务挂掉后,如果不进行流量控制,整个系统可能会出现服务雪崩。下游系统崩溃,会造成上游的系统级联崩溃,最终造成整个系统不可用。
官方文档 链接

sentinel 可以用来做什么?

  1. 流量控制,流量整形
  2. 熔断
  3. 应用监控优势

优势:
与 hystrix 相比,hystrix 已经不再维护,sentinel 社区活跃,支持各种场景,比如 feign, dubbo, grpc。入门很简单,代码零侵入实现限流和熔断。

功能

SpringCloud Sentinel - 图1

生态

SpringCloud Sentinel - 图2

使用 Demo

依赖,主pom 文件添加:

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-alibaba-dependencies</artifactId>
  4. <version>2.0.3.RELEASE</version>
  5. <type>pom</type>
  6. <scope>import</scope>
  7. </dependency>

service pom 添加:

  1. <!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-sentinel -->
  2. <dependency>
  3. <groupId>com.alibaba.cloud</groupId>
  4. <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  5. </dependency>

配置文件

  1. spring.cloud.sentinel.enabled=true
  2. spring.cloud.sentinel.transport.dashboard=127.0.0.1:8080
  3. spring.cloud.sentinel.transport.port=8719
  4. spring.cloud.sentinel.transport.client-ip= 127.0.0.1
  5. spring.cloud.sentinel.transport.heartbeat-interval-ms=5000
  6. feign.sentinel.enabled=true

控制台 jar 包 : 链接
下载下来后,指定端口启动

  1. java -Dserver.port=9561 \
  2. -Dcsp.sentinel.dashboard.server=localhost:9561 \
  3. -jar target/sentinel-dashboard.jar

访问服务后,dashBoard 可以显示实时的请求,可以对对应的资源配置限流和熔断。
image.png
对某个接口配置熔断:
image.png

熔断效果:
image.png

原理

使用责任链模式,把限流,统计,熔断==等功能通过责任链串起来,主要做数据统计和各种判断。底层做统计的数据结构是时间滑动窗口
image.png
滑动窗口的实现:

  1. package com.alibaba.csp.sentinel.slots.statistic.base;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.concurrent.atomic.AtomicReferenceArray;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. import com.alibaba.csp.sentinel.util.AssertUtil;
  7. import com.alibaba.csp.sentinel.util.TimeUtil;
  8. /**
  9. * <p>
  10. * Basic data structure for statistic metrics in Sentinel.
  11. * </p>
  12. * <p>
  13. * Leap array use sliding window algorithm to count data. Each bucket cover {@code windowLengthInMs} time span,
  14. * and the total time span is {@link #intervalInMs}, so the total bucket amount is:
  15. * {@code sampleCount = intervalInMs / windowLengthInMs}.
  16. * </p>
  17. *
  18. * @param <T> type of statistic data
  19. * @author jialiang.linjl
  20. * @author Eric Zhao
  21. * @author Carpenter Lee
  22. */
  23. public abstract class LeapArray<T> {
  24. protected int windowLengthInMs;
  25. protected int sampleCount;
  26. protected int intervalInMs;
  27. private double intervalInSecond;
  28. protected final AtomicReferenceArray<WindowWrap<T>> array;
  29. /**
  30. * The conditional (predicate) update lock is used only when current bucket is deprecated.
  31. */
  32. private final ReentrantLock updateLock = new ReentrantLock();
  33. /**
  34. * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
  35. *
  36. * @param sampleCount bucket count of the sliding window
  37. * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
  38. */
  39. public LeapArray(int sampleCount, int intervalInMs) {
  40. AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
  41. AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
  42. AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
  43. this.windowLengthInMs = intervalInMs / sampleCount;
  44. this.intervalInMs = intervalInMs;
  45. this.intervalInSecond = intervalInMs / 1000.0;
  46. this.sampleCount = sampleCount;
  47. this.array = new AtomicReferenceArray<>(sampleCount);
  48. }
  49. /**
  50. * Get the bucket at current timestamp.
  51. *
  52. * @return the bucket at current timestamp
  53. */
  54. public WindowWrap<T> currentWindow() {
  55. return currentWindow(TimeUtil.currentTimeMillis());
  56. }
  57. /**
  58. * Create a new statistic value for bucket.
  59. *
  60. * @param timeMillis current time in milliseconds
  61. * @return the new empty bucket
  62. */
  63. public abstract T newEmptyBucket(long timeMillis);
  64. /**
  65. * Reset given bucket to provided start time and reset the value.
  66. *
  67. * @param startTime the start time of the bucket in milliseconds
  68. * @param windowWrap current bucket
  69. * @return new clean bucket at given start time
  70. */
  71. protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);
  72. private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
  73. long timeId = timeMillis / windowLengthInMs;
  74. // Calculate current index so we can map the timestamp to the leap array.
  75. return (int)(timeId % array.length());
  76. }
  77. protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
  78. return timeMillis - timeMillis % windowLengthInMs;
  79. }
  80. /**
  81. * Get bucket item at provided timestamp.
  82. *
  83. * @param timeMillis a valid timestamp in milliseconds
  84. * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
  85. */
  86. public WindowWrap<T> currentWindow(long timeMillis) {
  87. if (timeMillis < 0) {
  88. return null;
  89. }
  90. int idx = calculateTimeIdx(timeMillis);
  91. // Calculate current bucket start time.
  92. long windowStart = calculateWindowStart(timeMillis);
  93. /*
  94. * Get bucket item at given time from the array.
  95. *
  96. * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
  97. * (2) Bucket is up-to-date, then just return the bucket.
  98. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
  99. */
  100. while (true) {
  101. WindowWrap<T> old = array.get(idx);
  102. if (old == null) {
  103. /*
  104. * B0 B1 B2 NULL B4
  105. * ||_______|_______|_______|_______|_______||___
  106. * 200 400 600 800 1000 1200 timestamp
  107. * ^
  108. * time=888
  109. * bucket is empty, so create new and update
  110. *
  111. * If the old bucket is absent, then we create a new bucket at {@code windowStart},
  112. * then try to update circular array via a CAS operation. Only one thread can
  113. * succeed to update, while other threads yield its time slice.
  114. */
  115. WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
  116. if (array.compareAndSet(idx, null, window)) {
  117. // Successfully updated, return the created bucket.
  118. return window;
  119. } else {
  120. // Contention failed, the thread will yield its time slice to wait for bucket available.
  121. Thread.yield();
  122. }
  123. } else if (windowStart == old.windowStart()) {
  124. /*
  125. * B0 B1 B2 B3 B4
  126. * ||_______|_______|_______|_______|_______||___
  127. * 200 400 600 800 1000 1200 timestamp
  128. * ^
  129. * time=888
  130. * startTime of Bucket 3: 800, so it's up-to-date
  131. *
  132. * If current {@code windowStart} is equal to the start timestamp of old bucket,
  133. * that means the time is within the bucket, so directly return the bucket.
  134. */
  135. return old;
  136. } else if (windowStart > old.windowStart()) {
  137. /*
  138. * (old)
  139. * B0 B1 B2 NULL B4
  140. * |_______||_______|_______|_______|_______|_______||___
  141. * ... 1200 1400 1600 1800 2000 2200 timestamp
  142. * ^
  143. * time=1676
  144. * startTime of Bucket 2: 400, deprecated, should be reset
  145. *
  146. * If the start timestamp of old bucket is behind provided time, that means
  147. * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
  148. * Note that the reset and clean-up operations are hard to be atomic,
  149. * so we need a update lock to guarantee the correctness of bucket update.
  150. *
  151. * The update lock is conditional (tiny scope) and will take effect only when
  152. * bucket is deprecated, so in most cases it won't lead to performance loss.
  153. */
  154. if (updateLock.tryLock()) {
  155. try {
  156. // Successfully get the update lock, now we reset the bucket.
  157. return resetWindowTo(old, windowStart);
  158. } finally {
  159. updateLock.unlock();
  160. }
  161. } else {
  162. // Contention failed, the thread will yield its time slice to wait for bucket available.
  163. Thread.yield();
  164. }
  165. } else if (windowStart < old.windowStart()) {
  166. // Should not go through here, as the provided time is already behind.
  167. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
  168. }
  169. }
  170. }
  171. /**
  172. * Get the previous bucket item before provided timestamp.
  173. *
  174. * @param timeMillis a valid timestamp in milliseconds
  175. * @return the previous bucket item before provided timestamp
  176. */
  177. public WindowWrap<T> getPreviousWindow(long timeMillis) {
  178. if (timeMillis < 0) {
  179. return null;
  180. }
  181. int idx = calculateTimeIdx(timeMillis - windowLengthInMs);
  182. timeMillis = timeMillis - windowLengthInMs;
  183. WindowWrap<T> wrap = array.get(idx);
  184. if (wrap == null || isWindowDeprecated(wrap)) {
  185. return null;
  186. }
  187. if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {
  188. return null;
  189. }
  190. return wrap;
  191. }
  192. /**
  193. * Get the previous bucket item for current timestamp.
  194. *
  195. * @return the previous bucket item for current timestamp
  196. */
  197. public WindowWrap<T> getPreviousWindow() {
  198. return getPreviousWindow(TimeUtil.currentTimeMillis());
  199. }
  200. /**
  201. * Get statistic value from bucket for provided timestamp.
  202. *
  203. * @param timeMillis a valid timestamp in milliseconds
  204. * @return the statistic value if bucket for provided timestamp is up-to-date; otherwise null
  205. */
  206. public T getWindowValue(long timeMillis) {
  207. if (timeMillis < 0) {
  208. return null;
  209. }
  210. int idx = calculateTimeIdx(timeMillis);
  211. WindowWrap<T> bucket = array.get(idx);
  212. if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
  213. return null;
  214. }
  215. return bucket.value();
  216. }
  217. /**
  218. * Check if a bucket is deprecated, which means that the bucket
  219. * has been behind for at least an entire window time span.
  220. *
  221. * @param windowWrap a non-null bucket
  222. * @return true if the bucket is deprecated; otherwise false
  223. */
  224. public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
  225. return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
  226. }
  227. public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
  228. return time - windowWrap.windowStart() > intervalInMs;
  229. }
  230. /**
  231. * Get valid bucket list for entire sliding window.
  232. * The list will only contain "valid" buckets.
  233. *
  234. * @return valid bucket list for entire sliding window.
  235. */
  236. public List<WindowWrap<T>> list() {
  237. return list(TimeUtil.currentTimeMillis());
  238. }
  239. public List<WindowWrap<T>> list(long validTime) {
  240. int size = array.length();
  241. List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);
  242. for (int i = 0; i < size; i++) {
  243. WindowWrap<T> windowWrap = array.get(i);
  244. if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) {
  245. continue;
  246. }
  247. result.add(windowWrap);
  248. }
  249. return result;
  250. }
  251. /**
  252. * Get all buckets for entire sliding window including deprecated buckets.
  253. *
  254. * @return all buckets for entire sliding window
  255. */
  256. public List<WindowWrap<T>> listAll() {
  257. int size = array.length();
  258. List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);
  259. for (int i = 0; i < size; i++) {
  260. WindowWrap<T> windowWrap = array.get(i);
  261. if (windowWrap == null) {
  262. continue;
  263. }
  264. result.add(windowWrap);
  265. }
  266. return result;
  267. }
  268. /**
  269. * Get aggregated value list for entire sliding window.
  270. * The list will only contain value from "valid" buckets.
  271. *
  272. * @return aggregated value list for entire sliding window
  273. */
  274. public List<T> values() {
  275. return values(TimeUtil.currentTimeMillis());
  276. }
  277. public List<T> values(long timeMillis) {
  278. if (timeMillis < 0) {
  279. return new ArrayList<T>();
  280. }
  281. int size = array.length();
  282. List<T> result = new ArrayList<T>(size);
  283. for (int i = 0; i < size; i++) {
  284. WindowWrap<T> windowWrap = array.get(i);
  285. if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
  286. continue;
  287. }
  288. result.add(windowWrap.value());
  289. }
  290. return result;
  291. }
  292. /**
  293. * Get the valid "head" bucket of the sliding window for provided timestamp.
  294. * Package-private for test.
  295. *
  296. * @param timeMillis a valid timestamp in milliseconds
  297. * @return the "head" bucket if it exists and is valid; otherwise null
  298. */
  299. WindowWrap<T> getValidHead(long timeMillis) {
  300. // Calculate index for expected head time.
  301. int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
  302. WindowWrap<T> wrap = array.get(idx);
  303. if (wrap == null || isWindowDeprecated(wrap)) {
  304. return null;
  305. }
  306. return wrap;
  307. }
  308. /**
  309. * Get the valid "head" bucket of the sliding window at current timestamp.
  310. *
  311. * @return the "head" bucket if it exists and is valid; otherwise null
  312. */
  313. public WindowWrap<T> getValidHead() {
  314. return getValidHead(TimeUtil.currentTimeMillis());
  315. }
  316. /**
  317. * Get sample count (total amount of buckets).
  318. *
  319. * @return sample count
  320. */
  321. public int getSampleCount() {
  322. return sampleCount;
  323. }
  324. /**
  325. * Get total interval length of the sliding window in milliseconds.
  326. *
  327. * @return interval in second
  328. */
  329. public int getIntervalInMs() {
  330. return intervalInMs;
  331. }
  332. /**
  333. * Get total interval length of the sliding window.
  334. *
  335. * @return interval in second
  336. */
  337. public double getIntervalInSecond() {
  338. return intervalInSecond;
  339. }
  340. public void debug(long time) {
  341. StringBuilder sb = new StringBuilder();
  342. List<WindowWrap<T>> lists = list(time);
  343. sb.append("Thread_").append(Thread.currentThread().getId()).append("_");
  344. for (WindowWrap<T> window : lists) {
  345. sb.append(window.windowStart()).append(":").append(window.value().toString());
  346. }
  347. System.out.println(sb.toString());
  348. }
  349. public long currentWaiting() {
  350. // TODO: default method. Should remove this later.
  351. return 0;
  352. }
  353. public void addWaiting(long time, int acquireCount) {
  354. // Do nothing by default.
  355. throw new UnsupportedOperationException();
  356. }
  357. }