本页简要讨论如何在IDE或本地环境中测试Flink应用程序。
单元测试
通常,可以假设Flink在用户定义之外产生正确的结果Function。因此,建议Function尽可能使用单元测试来测试包含主业务逻辑的类。
例如,如果实现以下内容ReduceFunction:
public class SumReduce implements ReduceFunction<Long> {@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}}
class SumReduce extends ReduceFunction[Long] {override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {value1 + value2}}
通过传递合适的参数并验证输出,可以很容易地使用您喜欢的框架对其进行单元测试:
public class SumReduceTest {@Testpublic void testSum() throws Exception {// instantiate your functionSumReduce sumReduce = new SumReduce();// call the methods that you have implementedassertEquals(42L, sumReduce.reduce(40L, 2L));}}
class SumReduceTest extends FlatSpec with Matchers {"SumReduce" should "add values" in {// instantiate your functionval sumReduce: SumReduce = new SumReduce()// call the methods that you have implementedsumReduce.reduce(40L, 2L) should be (42L)}}
集成测试
为了端到端测试Flink流管道,您还可以编写针对本地Flink迷你集群执行的集成测试。
为此,添加测试依赖项flink-test-utils:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils_2.11</artifactId><version>1.7-SNAPSHOT</version></dependency>
例如,如果要测试以下内容MapFunction:
public class MultiplyByTwo implements MapFunction<Long, Long> {@Overridepublic Long map(Long value) throws Exception {return value * 2;}}
class MultiplyByTwo extends MapFunction[Long, Long] {override def map(value: Long): Long = {value * 2}}
您可以编写以下集成测试:
public class ExampleIntegrationTest extends AbstractTestBase {@Testpublic void testMultiply() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(1);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements(1L, 21L, 22L).map(new MultiplyByTwo()).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);}// create a testing sinkprivate static class CollectSink implements SinkFunction<Long> {// must be staticpublic static final List<Long> values = new ArrayList<>();@Overridepublic synchronized void invoke(Long value) throws Exception {values.add(value);}}}
class ExampleIntegrationTest extends AbstractTestBase {@Testdef testMultiply(): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// configure your test environmentenv.setParallelism(1)// values are collected in a static variableCollectSink.values.clear()// create a stream of custom elements and apply transformationsenv.fromElements(1L, 21L, 22L).map(new MultiplyByTwo()).addSink(new CollectSink())// executeenv.execute()// verify your resultsassertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)}}// create a testing sink class CollectSink extends SinkFunction[Long] {override def invoke(value: java.lang.Long): Unit = {synchronized {values.add(value)}}}object CollectSink {// must be staticval values: List[Long] = new ArrayList()}
CollectSink此处使用静态变量in ,因为Flink在将所有 算子分布到集群之前将其序列化。通过静态变量与本地Flink迷你集群实例化的算子进行通信是解决此问题的一种方法。或者,您可以使用测试接收器将数据写入临时目录中的文件。您还可以实现自己的自定义源以发出水印。
测试检查点和状态处理
测试状态处理的一种方法是在集成测试中启用检查点。
您可以通过StreamExecutionEnvironment在测试中配置来完成此 算子操作:
env.enableCheckpointing(500);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
env.enableCheckpointing(500)env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
例如,向Flink应用程序添加一个身份映射器 算子,该 算子将每次抛出一次异常1000ms。但是,由于动作之间存在时间依赖关系,因此编写此类测试可能会非常棘手。
另一种方法是写使用Flink内部测试效用一个单元测试AbstractStreamOperatorTestHarness从flink-streaming-java模块。
对于如何做到这一点,请看看在一个例子org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest中也flink-streaming-java模块。
请注意,AbstractStreamOperatorTestHarness目前它不是公共API的一部分,可能会有所变化。
