虽然 Nexmark 目前仅支持了 Flink 引擎,但在当前也具有一定的意义,例如:简介: 每一种引擎有其优势的地方,如何选择适合自己业务的流计算引擎成了一个由来已久的话题。除了比较各个引擎提供的不同的功能矩阵之外,性能是一个无法绕开的评估因素。基准测试(benchmark)就是用来评估系统性能的一个重要和常见的过程。
- 作为 Flink 引擎版本迭代之间的性能测试工具,甚至是日常回归工具,及时发现性能回退的问题。
- 在开发 Flink 性能优化的功能时,可以用来验证性能优化的效果。
- 部分公司可能会有 Flink 的内部版本,可以用作内部版本与开源版本之间的性能对比工具。
现有流计算基准测试的问题
目前在流计算领域中,还没有一个行业标准的基准测试。目前业界较为人知的流计算 benchmark 是五年前雅虎 Storm 团队发布的 Yahoo Streaming Benchmarks。雅虎的原意是因为业界缺少反映真实场景的 benchmark,模拟了一个简单的广告场景来比较各个流计算框架,后来被广泛引用。具体场景是从 Kafka 消费的广告的点击流,关联 Redis 中的广告所属的 campaign 信息,然后做时间窗口聚合计数。 然而,正是因为雅虎团队太过于追求还原真实的生产环境,导致这些外部系统服务(Kafka, Redis)成为了作业的瓶颈。Ververica 曾在这篇文章中做过一个扩展实验,将数据源从 Kafka 替换成了一个内置的 datagen source,性能提升了 37 倍! 由此可见,引入的 Kafka 组件导致了无法准确反映引擎真实的性能。更重要的一个问题是,Yahoo Benchmark 只包含一个非常简单的,类似 “Word Count” 的作业,它无法全面地反映当今复杂的流计算系统和业务。试想,谁会用一个简单的 “Word Count” 去衡量比较各个数据库之间的性能差异呢?正是这些原因使得 Yahoo Benchmark 无法成为 一个行业标准的基准测试。这也正是我们想要解决的问题。 因此,我们认为一个行业标准的基准测试应该具备以下几个特点:1. 可复现性
可复现性是使得 benchmark 被信任的一个重要条件。许多 benchmark 的结果是难以重现的。有的是因为只摆了个 benchmark 结果图,用于生成这些结果的代码并没有公开。有的是因为用于 benchmark 的硬件不容易被别人获取到。有的是因为 benchmark 依赖的服务太多,致使测试结果不稳定。2. 能调整作业的负载(数据量、数据分布)
例如数据库领域非常著名的 TPC-H、TPC-DS 涵盖了大量的 query 集合,来捕获查询引擎之间细微的差别。而且这些 query 集合都立于真实业务场景之上(商品零售行业),数据规模大,因此也很受一些大数据系统的青睐。3. 能调整作业的负载。即数据量、数据分布
在大数据领域,不同的数据规模对于引擎来说可能会是完全不同的事情。例如 Yahoo Benchmark 中使用的 campaign id 只有 100 个,使得状态非常小,内存都可以装的下。这样使得同步 IO 和 checkpoint 等的影响可以忽略不计。而真实的场景往往要面对大状态,面临的挑战要复杂困难的多。像 TPC-DS 的数据生成工具会提供 scalar factor 的参数来控制数据量。其次在数据分布上最好也能贴近真实世界的数据,如有数据倾斜,及调整倾斜比例。从而能全面、综合地反映业务场景和引擎之间地差异。4. 有统一的性能衡量指标和采集汇总工具
基准测试的性能指标的定义需要清晰、一致,且能适用于各种计算引擎。然而流计算的性能指标要比传统批处理的更难定义、更难采集。是流计算 benchmark 最具挑战性的一个问题,这也会在下文展开描述。 我们也研究了很多其他的流计算相关的基准测试,包括:StreamBench、HiBench、BigDataBench,但是它们都在上述几个基本面有所欠缺。基准测试的行业标杆无疑是 TPC 发布的一系列 benchmark,如 TPC-H,TPC-DS。然而这些 benchmark 是面向传统数据库、传统数仓而设计的,并不适用于今天的流计算系统。例如 benchmark 中没有考虑事件时间、数据的乱序、窗口等流计算中常见的场景。Nexmark 基准测试框架的设计
移除外部 source、sink 依赖
如上所述,Yahoo Benchmark 使用了 Kafka 数据源,却使得最终结果无法准确反映引擎的真实性能。此外,我们还发现,在 benchmark 快慢流双流 JOIN 的场景时,如果使用了 Kafka 数据源,慢流会超前消费(快流易被反压),导致 JOIN 节点的状态会缓存大量超前的数据。这其实不能反映真实的场景,因为在真实的场景下,慢流是无法被超前消费的(数据还未产生)。所以我们在 Nexmark 中使用了 datagen source,数据直接在内存中生成,数据不落地,直接向下游节点发送。多个事件流都由单一的数据生成器生成,所以当快流被反压时,也能抑制慢流的生成,较好地反映了真实场景。 与之类似的,我们也移除了外部 sink 的依赖,不再输出到 Kafka/Redis,而是输出到一个空 sink 中,即 sink 会丢弃收到的所有数据。 通过这种方式,我们保证了瓶颈只会在引擎自身,从而能精确地测量出引擎之间细微的差异。Metrics
批处理系统 benchmark 的 metric 通常采用总体耗时来衡量。然而流计算系统处理的数据是源源不断的,无法统计 query 耗时。因此,我们提出三个主要的 metric:吞吐、延迟、CPU。Nexmark 测试框架会自动帮我们采集 metric,并做汇总,不需要部署任何第三方的 metric 服务。吞吐
吞吐(throughput)也常被称作 TPS,描述流计算系统每秒能处理多少条数据。由于我们有多个事件流,所有事件流都由一个数据生成器生成,为了统一观测角度,我们采用数据生成器的 TPS,而非单一事件流的 TPS。我们将一个 query 能达到的<font style="color:#E8323C;">最大吞吐,作为其吞吐指标</font>
。例如,针对 Flink 引擎,我们通过 Flink REST API 暴露的 延迟
CPU
资源使用率是很多流计算 benchmark 中忽视的一个指标。由于在真实生产环境,我们并不会限制流计算引擎所能使用的核数,从而给系统更大的弹性。所以我们引入了 CPU 使用率,作为辅助指标,即作业一共消耗了多少核。通过 吞吐/cores,可以计算出平均每个核对于吞吐的贡献。对于进程的 CPU 使用率的采集,我们没有使用 JVM CPU load,而是借鉴了 YARN 中的实现,通过采样 /proc/Query 与 Schema
Nexmark 的业务模型基于一个真实的在线拍卖系统。所有的 query 都基于相同的三个数据流,三个数据流会有一个数据生成器生成,来控制他们之间的比例、数据偏斜、关联关系等等。这三个数据流分别是:- 用户(Person):代表一个提交拍卖,或参与竞标的用户。
- 拍卖(Auction):代表一个拍卖品。
- 竞标(Bid): 代表一个对拍卖品的出价。
Query | 标题 | 简介 | Flink |
---|---|---|---|
q0 | Pass Through | 测量空跑时的开销,包括监控和数据生成器的开销。 | ✅ |
q1 | Currency Conversion | 将每个竞标价格从美元转换为欧元。 | ✅ |
q2 | Selection | 过滤出满足条件的竞标记录。 | ✅ |
q3 | Local Item Suggestion | 来自指定城市的用户的拍卖品。展示了双流 JOIN。 | ✅ |
q4 | Average Price for a Category | 求出在每个分类下,获胜竞标的平均价格。 | ✅ |
q5 | Hot Items | 在过去一段时间,哪些拍卖品收到了最多的竞标? | ✅ |
q6 | Average Selling Price by Seller | 每个卖家过去10个成功售出的拍卖品的平均价格是多少? | FLINK-19059 |
q7 | Highest Bid | 过去一段时间出价最高的竞标,及其竞标价格。 | ✅ |
q8 | Monitor New Users | 过去一段时间新进入系统并创建拍卖的用户。 | ✅ |
q9 | Winning Bids | 计算每个拍卖品的获胜竞标记录。 | ✅ |
q10 | Log to File System | 将所有事件记录到文件系统。展示了将数据流按窗口写入分区文件。 | ✅ |
q11 | User Sessions | 每个用户在每个活跃周期中进行了多少次出价?展示了 session window。 | ✅ |
q12 | Processing Time Windows | 每个用户在固定的处理时间窗口中进行了多少次出价?展示了 processing time window。 | ✅ |
q13 | Bounded Side Input Join | 竞标流与一个静态白名单关联,展示了基础的维表关联。 | ✅ |
q14 | Calculation | 为竞标流转化和生成更多的字段。展示了更复杂的映射、过滤、UDF 的使用。 | ✅ |
q15 | Bidding Statistics Report | 每天有多少不同的用户参与了不同等级的拍卖中?展示了多 count distinct 的应用。 | ✅ |
q16 | Channel Statistics Report | How many distinct users join the bidding for different level of price for a channel? Illustrates multiple distinct aggregations with filters for multiple keys. | ✅ |
q17 | Auction Statistics Report | How many bids on an auction made a day and what is the price? Illustrates an unbounded group aggregation. | ✅ |
q18 | Find last bid | What’s a’s last bid for bidder to auction? Illustrates a Deduplicate query. | ✅ |
q19 | Auction TOP-10 Price | What’s the top price 10 bids of an auction? Illustrates a TOP-N query. | ✅ |
q20 | Expand bid with auction | Get bids with the corresponding auction information where category is 10. Illustrates a filter join. | ✅ |
q21 | Add channel id | Add a channel_id column to the bid table. Illustrates a ‘CASE WHEN’ + ‘REGEXP_EXTRACT’ SQL. | ✅ |
q22 | Get URL Directories | What is the directory structure of the URL? Illustrates a SPLIT_INDEX SQL. |
注意:q1~q8 来自原始 NEXMark 查询,q0 和 q9~q13 来自 Apache Beam,其他扩展以覆盖更多场景。
使用 Nexmark 进行基准测试
要求
Flink 集群
Nexmark 基准框架在 standalone cluster 上运行 Flink 查询,请参阅 Flink 文档了解更详细的要求以及如何设置它。
软件要求
集群应该由一个 master 节点和一个或多个 worker 节点组成。 所有节点都应该使用 Linux 环境(CPU 监控脚本需要在 Linux 上运行)。 请确保你在每个节点上安装了以下软件:
- JDK 1.8.x 或更高版本
- ssh
在所有集群节点上配置 免密 SSH 和以及创建相同的目录结构。
环境变量
在每个节点上为 Flink 和 Nexmark 脚本设置以下环境变量。
- JAVA_HOME:指向你的JDK安装目录。
- FLINK_HOME:指向你的 Flink 安装目录。
构建 Nexmark
在开始运行基准测试之前,首先需要构建 Nexmark 基准测试以获得一个基准测试包。 请确保你已在构建机器中安装了 maven
。 并在 nexmark-flink
目录下运行 ./build.sh
命令。 然后你会得到目录下的 nexmark-flink.tgz
。
Nexmark 资源配置
节点
- 1 个 master 节点(nexmark01),2 个 worker 节点(nexmark02,nexmark03)
- 每个节点有 8 个内核和 32 GB RAM
- 100 GB SSD 本地磁盘
Flink 配置
使用 nexmark-flink/src/main/resources/conf/ 中定义的默认配置文件 flink-conf.yaml
和 sql-client-defaults.yaml
。
一些值得注意的配置包括:
- 8 个 TaskManager,每个只有 1 个 slot
- 每个 TaskManager 和 JobManager 8 GB
- Job 并行度:8
- 启用了 exactly once 模式和 3 分钟间隔的 checkpoint
- 使用启用增量检查点的 RocksDB 状态后端
- 以 2 秒间隔和 5000 行启用 MiniBatch 优化
- 启用拆分不同聚合优化
Flink 版本:1.14.3。
工作负载
Source event 总数为 100,000,000(1亿)。 Source 每秒生成 10 M 条记录。 3个流的比例是 Bid: 92%, Auction: 6%, Person: 2%。 ## 设置集群 1. 下载最新的 Flink 包flink-<version>-bin-scala_2.11.tgz
plain
wget https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz
2. 将归档文件(flink-<version>-bin-scala_2.11.tgz
,nexmark-flink.tgz
)复制到 master 节点并解压
plain
tar xzf flink-1.14.3-bin-scala_2.11.tgz
mv flink-1.14.3-bin-scala_2.11 flink
tar xzf nexmark-flink.tgz
mv nexmark-flink nexmark
3. 将 nexmark/lib
下的 jar 包复制到 flink/lib
,其中包含 Nexmark source 生成器。
4. 配置 Flink
- 编辑 flink/conf/workers
并输入每个 worker 节点的 IP 地址。(建议设置 8 个条目,每个节点可以启动多个 TaskManager )
plain
nexmark02
nexmark02
nexmark02
nexmark02
nexmark03
nexmark03
nexmark03
nexmark03
- 编辑 flink/conf/master
并输入 master 节点的 IP 地址。
plain
nexmark01:8081
- 将 flink/conf/sql-client-defaults.yaml
替换为 nexmark/conf/sql-client-defaults.yaml
- 将 flink/conf/flink-conf.yaml
替换为 nexmark/conf/flink-conf.yaml
。并更新以下配置:
将 jobmanager.rpc.address
设置为 Master 的 IP 地址
将 state.checkpoints.dir
设置为本地文件路径(推荐使用 SSD),例如文件:<font style="color:rgb(36, 41, 47);">file:///home/username/checkpoint</font>
。
* 将 state.backend.rocksdb.localdir
设置为本地文件路径(推荐使用 SSD),例如 <font style="color:rgb(36, 41, 47);">/home/username/rocksdb</font>
。
5. 配置 Nexmark 基准测试,配置文件 nexmark/conf/nexmark.yaml
- 将nexmark.metric.reporter.host
设置为 master 节点的 IP 地址
- 将 flink.rest.address
设置为 master 节点的 IP 地址
6. 使用 scp
将 flink 和 nexmark 复制到 worker 节点
7. 在 master 节点上运行 flink/bin/start-cluster.sh
启动 Flink Cluster
8. 在 master 节点上运行 nexmark/bin/setup_cluster.sh
来设置基准测试集群
如果使用 Kafka 源而不是 datagen 源,则需要准备 Kafka 源数据
+ 下载 flink-sql-connector-kafka 并复制到 flink/lib
bash
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.3/flink-sql-connector-kafka_2.11-1.14.3.jar
mv flink-sql-connector-kafka_2.11-1.14.3.jar flink/lib
+ 启动 kafka 集群(推荐使用 SSD)
+ 创建 kafka 主题
bash
bin/kafka-topics.sh --create --topic nexmark --bootstrap-server localhost:9092 --partitions 8
+ 编辑nexmark/conf/nexmark.yaml
,设置 kafka.bootstrap.servers
+ 准备源数据:nexmark/bin/run_query.sh insert_kafka
+ 注意:Kafka 源是无穷无尽的,现在只支持 tps 模式(unlimited events.num)
运行 Nexmark
通过在 master 节点上运行 nexmark/bin/run_query.sh all
来运行 Nexmark 基准测试。 它将运行所有查询,并自动收集基准指标。 默认情况下,完成基准测试需要 50 分钟。 最后,它将在控制台上打印基准测试摘要结果(Cores * Time(s) for each query)。
你还可以通过运行 nexmark/bin/run_query.sh q1,q2
来运行特定查询。
你还可以通过编辑 nexmark/conf/nexmark.yaml
中的 nexmark.workload.*
选项来调整查询的工作负载。
Nexmark 基准测试结果
- 基准测试结果-**datagen 源**
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
| Nexmark Query | Events Num | Cores | Time(s) | Cores * Time(s) | Throughput/Cores |
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|q0 |100,000,000 |8.45 |76.323 |645.087 |155.02 K/s |
|q1 |100,000,000 |8.26 |76.643 |633.165 |157.94 K/s |
|q2 |100,000,000 |8.23 |69.309 |570.736 |175.21 K/s |
|q3 |100,000,000 |8.59 |76.531 |657.384 |152.12 K/s |
|q4 |100,000,000 |12.85 |226.605 |2912.841 |34.33 K/s |
|q5 |100,000,000 |10.8 |418.242 |4516.930 |22.14 K/s |
|q7 |100,000,000 |14.21 |570.983 |8112.884 |12.33 K/s |
|q8 |100,000,000 |9.42 |72.673 |684.288 |146.14 K/s |
|q9 |100,000,000 |16.11 |435.882 |7022.197 |14.24 K/s |
|q10 |100,000,000 |8.09 |213.795 |1729.775 |57.81 K/s |
|q11 |100,000,000 |10.6 |237.599 |2518.946 |39.7 K/s |
|q12 |100,000,000 |13.69 |96.559 |1321.536 |75.67 K/s |
|q13 |100,000,000 |8.24 |92.839 |764.952 |130.73 K/s |
|q14 |100,000,000 |8.28 |74.861 |620.220 |161.23 K/s |
|q15 |100,000,000 |8.73 |158.224 |1380.927 |72.42 K/s |
|q16 |100,000,000 |11.51 |466.008 |5362.602 |18.65 K/s |
|q17 |100,000,000 |9.24 |92.666 |856.162 |116.8 K/s |
|q18 |100,000,000 |12.49 |149.076 |1862.171 |53.7 K/s |
|q19 |100,000,000 |21.38 |106.190 |2270.551 |44.04 K/s |
|q20 |100,000,000 |17.27 |305.099 |5267.805 |18.98 K/s |
|q21 |100,000,000 |8.33 |121.845 |1015.293 |98.49 K/s |
|q22 |100,000,000 |8.25 |93.244 |769.471 |129.96 K/s |
|Total |2,200,000,000 |243.029 |4231.196 |51495.920 |1.89 M/s |
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
- 基准测试结果-**Kafka 源**