本页简要讨论如何在IDE或本地环境中测试Flink应用程序。
单元测试
通常,可以假设Flink在用户定义之外产生正确的结果Function
。因此,建议Function
尽可能使用单元测试来测试包含主业务逻辑的类。
例如,如果实现以下内容ReduceFunction
:
public class SumReduce implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
class SumReduce extends ReduceFunction[Long] {
override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
value1 + value2
}
}
通过传递合适的参数并验证输出,可以很容易地使用您喜欢的框架对其进行单元测试:
public class SumReduceTest {
@Test
public void testSum() throws Exception {
// instantiate your function
SumReduce sumReduce = new SumReduce();
// call the methods that you have implemented
assertEquals(42L, sumReduce.reduce(40L, 2L));
}
}
class SumReduceTest extends FlatSpec with Matchers {
"SumReduce" should "add values" in {
// instantiate your function
val sumReduce: SumReduce = new SumReduce()
// call the methods that you have implemented
sumReduce.reduce(40L, 2L) should be (42L)
}
}
集成测试
为了端到端测试Flink流管道,您还可以编写针对本地Flink迷你集群执行的集成测试。
为此,添加测试依赖项flink-test-utils
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
例如,如果要测试以下内容MapFunction
:
public class MultiplyByTwo implements MapFunction<Long, Long> {
@Override
public Long map(Long value) throws Exception {
return value * 2;
}
}
class MultiplyByTwo extends MapFunction[Long, Long] {
override def map(value: Long): Long = {
value * 2
}
}
您可以编写以下集成测试:
public class ExampleIntegrationTest extends AbstractTestBase {
@Test
public void testMultiply() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(1);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new MultiplyByTwo())
.addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
}
// create a testing sink
private static class CollectSink implements SinkFunction<Long> {
// must be static
public static final List<Long> values = new ArrayList<>();
@Override
public synchronized void invoke(Long value) throws Exception {
values.add(value);
}
}
}
class ExampleIntegrationTest extends AbstractTestBase {
@Test
def testMultiply(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// configure your test environment
env.setParallelism(1)
// values are collected in a static variable
CollectSink.values.clear()
// create a stream of custom elements and apply transformations
env
.fromElements(1L, 21L, 22L)
.map(new MultiplyByTwo())
.addSink(new CollectSink())
// execute
env.execute()
// verify your results
assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
}
}
// create a testing sink class CollectSink extends SinkFunction[Long] {
override def invoke(value: java.lang.Long): Unit = {
synchronized {
values.add(value)
}
}
}
object CollectSink {
// must be static
val values: List[Long] = new ArrayList()
}
CollectSink
此处使用静态变量in ,因为Flink在将所有 算子分布到集群之前将其序列化。通过静态变量与本地Flink迷你集群实例化的算子进行通信是解决此问题的一种方法。或者,您可以使用测试接收器将数据写入临时目录中的文件。您还可以实现自己的自定义源以发出水印。
测试检查点和状态处理
测试状态处理的一种方法是在集成测试中启用检查点。
您可以通过StreamExecutionEnvironment
在测试中配置来完成此 算子操作:
env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
env.enableCheckpointing(500)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
例如,向Flink应用程序添加一个身份映射器 算子,该 算子将每次抛出一次异常1000ms
。但是,由于动作之间存在时间依赖关系,因此编写此类测试可能会非常棘手。
另一种方法是写使用Flink内部测试效用一个单元测试AbstractStreamOperatorTestHarness
从flink-streaming-java
模块。
对于如何做到这一点,请看看在一个例子org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest
中也flink-streaming-java
模块。
请注意,AbstractStreamOperatorTestHarness
目前它不是公共API的一部分,可能会有所变化。