Testing

This page briefly discusses how to test a Flink application in your IDE or a local environment.

Unit testing

Usually, one can assume that Flink produces correct results outside of a user-defined Function. Therefore, it is recommended to test Function classes that contain the main business logic with unit tests as much as possible.

For example if one implements the following ReduceFunction:

  1. public class SumReduce implements ReduceFunction<Long> {
  2. @Override
  3. public Long reduce(Long value1, Long value2) throws Exception {
  4. return value1 + value2;
  5. }
  6. }
  1. class SumReduce extends ReduceFunction[Long] {
  2. override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = {
  3. value1 + value2
  4. }
  5. }

It is very easy to unit test it with your favorite framework by passing suitable arguments and verify the output:

  1. public class SumReduceTest {
  2. @Test
  3. public void testSum() throws Exception {
  4. // instantiate your function
  5. SumReduce sumReduce = new SumReduce();
  6. // call the methods that you have implemented
  7. assertEquals(42L, sumReduce.reduce(40L, 2L));
  8. }
  9. }
  1. class SumReduceTest extends FlatSpec with Matchers {
  2. "SumReduce" should "add values" in {
  3. // instantiate your function
  4. val sumReduce: SumReduce = new SumReduce()
  5. // call the methods that you have implemented
  6. sumReduce.reduce(40L, 2L) should be (42L)
  7. }
  8. }

Integration testing

In order to end-to-end test Flink streaming pipelines, you can also write integration tests that are executed against a local Flink mini cluster.

In order to do so add the test dependency flink-test-utils:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-test-utils_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

For example, if you want to test the following MapFunction:

  1. public class MultiplyByTwo implements MapFunction<Long, Long> {
  2. @Override
  3. public Long map(Long value) throws Exception {
  4. return value * 2;
  5. }
  6. }
  1. class MultiplyByTwo extends MapFunction[Long, Long] {
  2. override def map(value: Long): Long = {
  3. value * 2
  4. }
  5. }

You could write the following integration test:

  1. public class ExampleIntegrationTest extends AbstractTestBase {
  2. @Test
  3. public void testMultiply() throws Exception {
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. // configure your test environment
  6. env.setParallelism(1);
  7. // values are collected in a static variable
  8. CollectSink.values.clear();
  9. // create a stream of custom elements and apply transformations
  10. env.fromElements(1L, 21L, 22L)
  11. .map(new MultiplyByTwo())
  12. .addSink(new CollectSink());
  13. // execute
  14. env.execute();
  15. // verify your results
  16. assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
  17. }
  18. // create a testing sink
  19. private static class CollectSink implements SinkFunction<Long> {
  20. // must be static
  21. public static final List<Long> values = new ArrayList<>();
  22. @Override
  23. public synchronized void invoke(Long value) throws Exception {
  24. values.add(value);
  25. }
  26. }
  27. }
  1. class ExampleIntegrationTest extends AbstractTestBase {
  2. @Test
  3. def testMultiply(): Unit = {
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. // configure your test environment
  6. env.setParallelism(1)
  7. // values are collected in a static variable
  8. CollectSink.values.clear()
  9. // create a stream of custom elements and apply transformations
  10. env
  11. .fromElements(1L, 21L, 22L)
  12. .map(new MultiplyByTwo())
  13. .addSink(new CollectSink())
  14. // execute
  15. env.execute()
  16. // verify your results
  17. assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
  18. }
  19. }
  20. // create a testing sink class CollectSink extends SinkFunction[Long] {
  21. override def invoke(value: java.lang.Long): Unit = {
  22. synchronized {
  23. values.add(value)
  24. }
  25. }
  26. }
  27. object CollectSink {
  28. // must be static
  29. val values: List[Long] = new ArrayList()
  30. }

The static variable in CollectSink is used here because Flink serializes all operators before distributing them across a cluster. Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. Alternatively, you could for example write the data to files in a temporary directory with your test sink. You can also implement your own custom sources for emitting watermarks.

Testing checkpointing and state handling

One way to test state handling is to enable checkpointing in integration tests.

You can do that by configuring your StreamExecutionEnvironment in the test:

  1. env.enableCheckpointing(500);
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
  1. env.enableCheckpointing(500)
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))

And for example adding to your Flink application an identity mapper operator that will throw an exception once every 1000ms. However writing such test could be tricky because of time dependencies between the actions.

Another approach is to write a unit test using the Flink internal testing utility AbstractStreamOperatorTestHarness from the flink-streaming-java module.

For an example of how to do that please have a look at the org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest also in the flink-streaming-java module.

Be aware that AbstractStreamOperatorTestHarness is currently not a part of public API and can be subject to change.