性能问题

最近在跑flink社区1.15版本使用json_value函数时,发现其性能很差,通过jstack查看堆栈经常在执行以下堆栈
image.png
可以看到这里的逻辑是在等锁,查看jsonpath的LRUCache

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.table.shaded.com.jayway.jsonpath.spi.cache;
  6. import java.util.Deque;
  7. import java.util.LinkedList;
  8. import java.util.Map;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. import java.util.concurrent.locks.ReentrantLock;
  11. import org.apache.flink.table.shaded.com.jayway.jsonpath.JsonPath;
  12. public class LRUCache implements Cache {
  13. private final ReentrantLock lock = new ReentrantLock();
  14. private final Map<String, JsonPath> map = new ConcurrentHashMap();
  15. private final Deque<String> queue = new LinkedList();
  16. private final int limit;
  17. public LRUCache(int limit) {
  18. this.limit = limit;
  19. }
  20. public void put(String key, JsonPath value) {
  21. JsonPath oldValue = (JsonPath)this.map.put(key, value);
  22. if (oldValue != null) {
  23. this.removeThenAddKey(key);
  24. } else {
  25. this.addKey(key);
  26. }
  27. if (this.map.size() > this.limit) {
  28. this.map.remove(this.removeLast());
  29. }
  30. }
  31. public JsonPath get(String key) {
  32. JsonPath jsonPath = (JsonPath)this.map.get(key);
  33. if (jsonPath != null) {
  34. this.removeThenAddKey(key);
  35. }
  36. return jsonPath;
  37. }
  38. private void addKey(String key) {
  39. this.lock.lock();
  40. try {
  41. this.queue.addFirst(key);
  42. } finally {
  43. this.lock.unlock();
  44. }
  45. }
  46. private String removeLast() {
  47. this.lock.lock();
  48. String var2;
  49. try {
  50. String removedKey = (String)this.queue.removeLast();
  51. var2 = removedKey;
  52. } finally {
  53. this.lock.unlock();
  54. }
  55. return var2;
  56. }
  57. private void removeThenAddKey(String key) {
  58. this.lock.lock();
  59. try {
  60. this.queue.removeFirstOccurrence(key);
  61. this.queue.addFirst(key);
  62. } finally {
  63. this.lock.unlock();
  64. }
  65. }
  66. private void removeFirstOccurrence(String key) {
  67. this.lock.lock();
  68. try {
  69. this.queue.removeFirstOccurrence(key);
  70. } finally {
  71. this.lock.unlock();
  72. }
  73. }
  74. ...
  75. }

可以看到get操作时,如果获取到的是有值的,那么会更新相应key的数据从双端队列移到首位,借此来实现LRU的功能,但是这样每次get和put操作都是需要加锁的,因此并发情况下吞吐就会比较低,也会导致cpu使用效率较低。
从jsonpath社区查看相应的问题,也有相关的反馈
https://github.com/json-path/JsonPath/issues/740
https://github.com/apache/pinot/pull/7409
比较方便的是,jsonpath 提供了spi的方式可以自定义的设置Cache的实现类,可以通过以下方式来设置新的cache实现。

  1. static {
  2. CacheProvider.setCache(new JsonPathCache());
  3. }

从pinot的实现中,我们看到他是用了guava的cache来替换了默认的LRUCache实现,那么这样实现性能优化有多少呢,这里我们是用java的性能测试框架jmh来测试下性能提升的情况

性能测试

这里为了方便,直接在flink-benchmark工程里添加了两个benchmark的测试类.
GuavaCache
LRUCache
这里面需要注意,因为cache是进程级别共享的,所以我们需要将设置@State(Benchmark)级别,这样我们构建的cache就是进程级别共享,而不是线程级别共享的。

写的测试是4个线程运行,缓存大小均为400
为了避免在本机运行时受本机的其他程序影响,最好是build jar之后放到服务器上跑

  1. java -jar target/benchmarks.jar -rf csv org.apache.flink.benchmark.GuavaCacheBenchmark

得到一个测试结果

  1. Benchmark Mode Cnt Score Error Units
  2. GuavaCacheBenchmark.get thrpt 30 4480.563 ± 203.311 ops/ms
  3. GuavaCacheBenchmark.put thrpt 30 1774.769 ± 119.198 ops/ms
  4. LRUCacheBenchmark.get thrpt 30 441.239 ± 2.812 ops/ms
  5. LRUCacheBenchmark.put thrpt 30 350.549 ± 12.285 ops/ms

可以看到使用guava的cache后,get性能提升8倍左右,put性能提升5倍左右。
这块性能提升的主要来源是cache的实现机制上,和caffeine 的作业在github上也简单了解了下相关的推荐实现
后面会写一篇文章来专门分析下caffeine cache的优化实现。

参考

https://github.com/ben-manes/caffeine/wiki/Benchmarks#read-100-1 caffeine benchmark
https://github.com/ben-manes/caffeine/blob/master/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/GetPutBenchmark.java caffeine benchmark
https://www.jianshu.com/p/ad34c4c8a2a3 jmh 框架常见参数
http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/ jmh 常见用例