很简单,主要参考官方文档1官方文档2这篇文档。其中会补充我实践时使用的情况。
flink版本 1.11.2。本文例子来自官网,完整例子来自github

编写单元测试是设计生产应用程序的基本任务之一。如果不进行测试,那么一个很小的代码变更都会导致生产任务的失败。因此,无论是清理数据、模型训练的简单作业,还是复杂的多租户实时数据处理系统,我们都应该为所有类型的应用程序编写单元测试。下面我们将提供有关 Apache Flink 应用程序的单元测试指南。Apache Flink 提供了一个强大的单元测试框架,以确保我们的应用程序在上线后符合我们的预期。

1. Maven依赖

如果我们要使用 Apache Flink 提供的单元测试框架,我们需要引入如下依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>test</scope>
  6. <classifier>tests</classifier>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
  11. <version>${flink.version}</version>
  12. <scope>test</scope>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-runtime_${scala.binary.version}</artifactId>
  17. <version>${flink.version}</version>
  18. <scope>test</scope>
  19. <classifier>tests</classifier>
  20. </dependency>

注意:

  1. 由于需要测试 JAR 包:org.apache.flink:flink-runtime_2.11:tests:1.11.2 和 org.apache.flink:flink-streaming-java_2.11:tests:1.11.2,所以依赖需要制定 classifier 为 tests。
  2. 我使用的是官方推荐maven构建flink的项目,pom.xml文件中已经有了flink-streaming-java_${scala.binary.version}这个项,那么此时不能直接使用上面的依赖写法,会由冲突,编译很久或失败。问题的关键点在于 test,删除第一个即可。
  3. 构建单元测试时,需要在src下,复制一份main的目录结构,并将main改为test,大致如下。测试哪个文件,其测试文件名加个Test。

image.png

对于不同的算子,单元测试的编写也不一样。我们可以分为如下三种:

  • 无状态算子
  • 有状态算子
  • 定时处理算子(ProcessFunction)

    2. 无状态算子

    只有在使用 Test Harnesses 时,我们才需要上述 Maven 依赖,所以在编写无状态算子的单元测试时,可以不添加上述 Maven 依赖。

无状态算子的单元测试编写比较简单。我们只需要遵循编写测试用例的基本规范,即创建函数类的实例并测试适当的方法。我们以一个简单的 Map 算子为例:

  1. package com.flink.example.test;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.util.Collector;
  4. /**
  5. * 无状态算子单元测试 FlatMap
  6. * Created by wy on 2020/11/8.
  7. */
  8. public class MyStatelessMap implements MapFunction<String, String> {
  9. @Override
  10. public String map(String in) throws Exception {
  11. String out = "hello " + in;
  12. return out;
  13. }
  14. }

还有一个无状态的FlatMap算子:

  1. package com.flink.example.test;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.util.Collector;
  4. public class MyStatelessFlatMap implements FlatMapFunction<String, String> {
  5. @Override
  6. public void flatMap(String s, Collector<String> collector) throws Exception {
  7. String out = "hello " + s;
  8. collector.collect(out);
  9. }
  10. }

上述两个算子的单元测试用例如下所示:

  1. package com.flink.example.test;
  2. import com.google.common.collect.Lists;
  3. import org.apache.flink.api.common.functions.util.ListCollector;
  4. import org.junit.Assert;
  5. import org.junit.Test;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. public class StatelessUnitTest {
  9. @Test
  10. public void MyStatelessMap() throws Exception {
  11. MyStatelessMap statelessMap = new MyStatelessMap();
  12. String out = statelessMap.map("world");
  13. Assert.assertEquals("hello world", out);
  14. }
  15. @Test
  16. public void MyStatelessFlatMap() throws Exception {
  17. MyStatelessFlatMap statelessFlatMap = new MyStatelessFlatMap();
  18. List<String> out = new ArrayList<>();
  19. ListCollector<String> listCollector = new ListCollector<>(out);
  20. statelessFlatMap.flatMap("world", listCollector);
  21. Assert.assertEquals(Lists.newArrayList("hello world"), out);
  22. }
  23. }

FlatMap 算子需要一个 Collector 对象以及一个输入参数。编写测试用例,我们有如下两种方式:

  • 使用 Mockito 模拟 Collector 对象
  • 使用 Flink 提供的 ListCollector

我更喜欢第二种方法,因为只需要很少的代码即可,并且适合大多数情况。

3. 有状态算子

测试有状态算子(使用状态或者定时器)会比较困难。因为用户代码需要与 Flink Runtime 进行交互。为此,Flink 提供了一组 TestHarness,可用于测试用户定义的函数以及自定义算子:

  • OneInputStreamOperatorTestHarness:适用于 DataStreams 上的算子
  • KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 上的算子
  • TwoInputStreamOperatorTestHarness:用于两个数据流的 ConnectedStream 的算子
  • KeyedTwoInputStreamOperatorTestHarness:用于两个 KeyedStream 的 ConnectedStream 上的算子

我们以有状态的 FlatMap 函数为例:

  1. package com.flink.example.test;
  2. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.Types;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.util.Collector;
  8. public class MyStatefulFlatMap extends RichFlatMapFunction<String, Long> {
  9. ValueState<Long> counterState;
  10. @Override
  11. public void open(Configuration parameters) throws Exception {
  12. ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
  13. "Counter",
  14. Types.LONG
  15. );
  16. this.counterState = getRuntimeContext().getState(descriptor);
  17. }
  18. @Override
  19. public void flatMap(String s, Collector<Long> collector) throws Exception {
  20. Long count = 0L;
  21. if (this.counterState.value() != null) {
  22. count = this.counterState.value();
  23. }
  24. count ++;
  25. this.counterState.update(count);
  26. collector.collect(count);
  27. }
  28. }

编写上述类的单元测试最复杂部分是模拟应用程序的配置以及运行时上下文。我们使用 Flink 提供的 TestHarness 类,这样我们就不必自己创建模拟对象。使用 KeyedOperatorHarness 类进行单元测试如下所示:

  1. package com.flink.example.test;
  2. import com.google.common.collect.Lists;
  3. import org.apache.flink.api.common.typeinfo.Types;
  4. import org.apache.flink.streaming.api.operators.StreamFlatMap;
  5. import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  6. import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
  7. import org.junit.Assert;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. public class MyStatefullFlatMapUnitTest {
  11. private KeyedOneInputStreamOperatorTestHarness<String, String, Long> testHarness;
  12. private MyStatefulFlatMap statefulFlatMap;
  13. @Before
  14. public void setupTestHarness() throws Exception {
  15. statefulFlatMap = new MyStatefulFlatMap();
  16. // KeyedOneInputStreamOperatorTestHarness 需要三个参数:算子对象、键 Selector、键类型
  17. testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
  18. new StreamFlatMap<>(statefulFlatMap),
  19. x -> "1",
  20. Types.STRING
  21. );
  22. testHarness.open();
  23. }
  24. @Test
  25. public void MyStatefulFlatMap() throws Exception{
  26. // test first record
  27. testHarness.processElement("a", 10);
  28. //
  29. Assert.assertEquals(
  30. Lists.newArrayList(new StreamRecord<>(1L, 10)),
  31. this.testHarness.extractOutputStreamRecords()
  32. );
  33. // test second record
  34. testHarness.processElement("b", 20);
  35. Assert.assertEquals(
  36. Lists.newArrayList(
  37. new StreamRecord<>(1L, 10),
  38. new StreamRecord<>(2L, 20)
  39. ),
  40. testHarness.extractOutputStreamRecords()
  41. );
  42. // test other record
  43. testHarness.processElement("c", 30);
  44. testHarness.processElement("d", 40);
  45. testHarness.processElement("e", 50);
  46. Assert.assertEquals(
  47. Lists.newArrayList(
  48. new StreamRecord<>(1L, 10),
  49. new StreamRecord<>(2L, 20),
  50. new StreamRecord<>(3L, 30),
  51. new StreamRecord<>(4L, 40),
  52. new StreamRecord<>(5L, 50)
  53. ),
  54. testHarness.extractOutputStreamRecords()
  55. );
  56. }
  57. }

KeyedOneInputStreamOperatorTestHarness 中参数数据类型依次为:key的类型,后两个是MyStatefulFlatMap extends RichFlatMapFunction 的Input和Output.

TestHarness 提供了许多辅助方法,上述代码使用了其中三种:

  • open:使用相关参数调用 FlatMap 函数的 open 方法,同时还会对上下文进行初始化。
  • processElement:允许用户传入输入元素以及与该元素关联的时间戳。
  • extractOutputStreamRecords:从 Collector 获取输出记录以及时间戳。

TestHarness 极大地简化了有状态算子的单元测试。

4. 定时处理算子

为与时间有关的 Process Function 编写单元测试与为有状态算子编写单元测试非常相似,我们都需要使用 TestHarness。但是,在这我们还需要考虑另一个方面,即为事件提供时间戳并控制应用程序的当前时间。通过设置当前(处理时间或事件时间)时间,我们可以触发注册的计时器,并调用该函数的 onTimer 方法:

  1. package com.flink.example.test;
  2. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  3. import org.apache.flink.util.Collector;
  4. public class TimerProcessFunction extends KeyedProcessFunction<String, String, String> {
  5. @Override
  6. public void processElement(String s, Context context, Collector<String> collector) throws Exception {
  7. context.timerService().registerProcessingTimeTimer(50);
  8. String out = "hello " + s;
  9. collector.collect(out);
  10. }
  11. @Override
  12. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  13. // 到达时间点触发事件操作
  14. out.collect(String.format("Timer triggered at timestamp %d", timestamp));
  15. }

我们需要测试 KeyedProcessFunction 中的两个方法,即 processElement 和 onTimer 方法。使用 TestHarness,我们可以控制函数的当前时间。因此,我们可以随意触发计时器,而不必等待特定的时间:

  1. package com.flink.example.test;
  2. import com.google.common.collect.Lists;
  3. import org.apache.flink.api.common.typeinfo.Types;
  4. import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
  5. import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  6. import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
  7. import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  8. import org.junit.Assert;
  9. import org.junit.Before;
  10. import org.junit.Test;
  11. public class TimerProcessFunctionUnitTest {
  12. private OneInputStreamOperatorTestHarness<String, String> testHarness;
  13. private TimerProcessFunction processFunction;
  14. @Before
  15. public void setupTestHarness() throws Exception {
  16. processFunction = new TimerProcessFunction();
  17. // KeyedOneInputStreamOperatorTestHarness 需要三个参数:算子对象、键 Selector、键类型
  18. testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
  19. new KeyedProcessOperator<>(processFunction),
  20. x -> "1",
  21. Types.STRING
  22. );
  23. // Function time is initialized to 0
  24. testHarness.open();
  25. }
  26. @Test
  27. public void testProcessElement() throws Exception{
  28. testHarness.processElement("world", 10);
  29. Assert.assertEquals(
  30. Lists.newArrayList(
  31. new StreamRecord<>("hello world", 10)
  32. ),
  33. testHarness.extractOutputStreamRecords()
  34. );
  35. }
  36. @Test
  37. public void testOnTimer() throws Exception {
  38. // test first record
  39. testHarness.processElement("world", 10);
  40. Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
  41. // Function time 设置为 50
  42. testHarness.setProcessingTime(50);
  43. Assert.assertEquals(
  44. Lists.newArrayList(
  45. new StreamRecord<>("hello world", 10),
  46. new StreamRecord<>("Timer triggered at timestamp 50")
  47. ),
  48. testHarness.extractOutputStreamRecords()
  49. );
  50. }
  51. }

考虑到 ProcessFunction 的重要性,除了上面可以直接用于测试 ProcessFunction 的 TestHarness 之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的 TestHarness 工厂,可以大大简化 TestHarness 的实例化。如下所示:

  1. package com.flink.example.test;
  2. import org.apache.flink.streaming.api.functions.ProcessFunction;
  3. import org.apache.flink.util.Collector;
  4. public class MyProcessFunction extends ProcessFunction<Integer, Integer> {
  5. @Override
  6. public void processElement(Integer integer, Context context, Collector<Integer> collector) throws Exception {
  7. collector.collect(integer);
  8. }
  9. }

通过传递适当的参数并验证输出,使用 ProcessFunctionTestHarnesses 进行单元测试会更加容易:

  1. package com.flink.example.test;
  2. import com.google.common.collect.Lists;
  3. import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  4. import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  5. import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
  6. import org.junit.Assert;
  7. import org.junit.Test;
  8. public class ProcessFunctionUnitTest {
  9. @Test
  10. public void testPassThrough() throws Exception {
  11. MyProcessFunction processFunction = new MyProcessFunction();
  12. OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = ProcessFunctionTestHarnesses
  13. .forProcessFunction(processFunction);
  14. testHarness.processElement(1, 10);
  15. Assert.assertEquals(
  16. Lists.newArrayList(
  17. new StreamRecord<>(1, 10)
  18. ),
  19. testHarness.extractOutputStreamRecords()
  20. );
  21. }
  22. }

有关如何使用 ProcessFunctionTestHarnesses 测试 ProcessFunction 不同风味(例如 KeyedProcessFunction,KeyedCoProcessFunction,BroadcastProcessFunction等)的更多示例,,可以参阅 ProcessFunctionTestHarnessesTest

  1. public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {
  2. }
  3. public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {
  4. }
  5. public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {
  6. }
  7. public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {
  8. }
  9. public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor... descriptors) throws Exception {
  10. }
  11. public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor... descriptors) throws Exception {
  12. }

另一个完整的单元测试样例:

  1. package com.flink.example.test;
  2. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  5. import org.apache.flink.streaming.api.functions.ProcessFunction;
  6. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
  7. import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
  8. import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
  9. import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  10. import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
  11. import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
  12. import org.apache.flink.util.Collector;
  13. import org.junit.Assert;
  14. import org.junit.Test;
  15. import java.util.Arrays;
  16. import java.util.Collections;
  17. /**
  18. * ProcessFunction 单元测试示例
  19. * Created by wy on 2020/11/9.
  20. */
  21. public class ProcessFunctionTestHarnessesTest {
  22. @Test
  23. public void testHarnessForProcessFunction() throws Exception {
  24. ProcessFunction<Integer, Integer> function = new ProcessFunction<Integer, Integer> () {
  25. @Override
  26. public void processElement(
  27. Integer value, Context ctx, Collector<Integer> out) throws Exception {
  28. out.collect(value);
  29. }
  30. };
  31. OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
  32. .forProcessFunction(function);
  33. harness.processElement(1, 10);
  34. Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
  35. }
  36. @Test
  37. public void testHarnessForKeyedProcessFunction() throws Exception {
  38. KeyedProcessFunction<Integer, Integer, Integer> function = new KeyedProcessFunction<Integer, Integer, Integer>() {
  39. @Override
  40. public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
  41. out.collect(value);
  42. }
  43. };
  44. OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
  45. .forKeyedProcessFunction(function, x -> x, BasicTypeInfo.INT_TYPE_INFO);
  46. harness.processElement(1, 10);
  47. Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
  48. }
  49. @Test
  50. public void testHarnessForCoProcessFunction() throws Exception {
  51. CoProcessFunction<Integer, String, Integer> function = new CoProcessFunction<Integer, String, Integer>() {
  52. @Override
  53. public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
  54. out.collect(value);
  55. }
  56. @Override
  57. public void processElement2(String value, Context ctx, Collector<Integer> out) throws Exception {
  58. out.collect(Integer.parseInt(value));
  59. }
  60. };
  61. TwoInputStreamOperatorTestHarness<Integer, String, Integer> harness = ProcessFunctionTestHarnesses
  62. .forCoProcessFunction(function);
  63. harness.processElement2("0", 1);
  64. harness.processElement1(1, 10);
  65. Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(0, 1));
  66. }
  67. @Test
  68. public void testHarnessForKeyedCoProcessFunction() throws Exception {
  69. KeyedCoProcessFunction<Integer, Integer, Integer, Integer> function = new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>() {
  70. @Override
  71. public void processElement1(Integer value, Context ctx, Collector<Integer> out) throws Exception {
  72. out.collect(value);
  73. }
  74. @Override
  75. public void processElement2(Integer value, Context ctx, Collector<Integer> out) throws Exception {
  76. out.collect(value);
  77. }
  78. };
  79. KeyedTwoInputStreamOperatorTestHarness<Integer, Integer, Integer, Integer> harness = ProcessFunctionTestHarnesses
  80. .forKeyedCoProcessFunction(function, x -> x, x -> x, TypeInformation.of(Integer.class));
  81. harness.processElement1(0, 1);
  82. harness.processElement2(1, 10);
  83. Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(0, 1));
  84. }
  85. }