Keyed State and Operator State

Keyed State 键状态

Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStrem.
Keyed State , exactly one state-partition per key.Each keyed-state is logically bound to a unique composite of ,

Operator State 操作符状态

each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.

Raw and Managed State 原始状态和管理状态

Keyed State and Operator State exist in two forms: managed and raw.
Managed State is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Examples are “ValueState”, “ListState”, etc. Flink’s runtime encodes the states and writes them into the checkpoints.
Raw State is state that operators keep in their own data structures.
Using managed state (rather than raw state) is recommended, since with managed state Flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management.

Using Managed Keyed State

  • ValueState: This keeps a value that can be updated and retrieved(检索).
  • ListState: This keeps a list of elements.You can append elements and retrieve an Iterable over all currently stored elements.
    • add(T)
    • addAll(List)
    • Iterable get()
    • update(List)
  • ReducingState: This keeps a single value that represents the aggregation of all values added to the state.The interface is similar to ListState but elements added using add(T) are reduced to an aggregate using a specified ReduceFunction.
  • AggregatingState: This keeps a single value that represents the aggregation of all values added to the state.

State is accessed using the RuntimeContext, so it is only possible in rich functions.

  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. /**
  3. * The ValueState handle. The first field is the count, the second field a running sum.
  4. */
  5. private transient ValueState<Tuple2<Long, Long>> sum;
  6. @Override
  7. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  8. // access the state value
  9. Tuple2<Long, Long> currentSum = sum.value();
  10. // update the count
  11. currentSum.f0 += 1;
  12. // add the second field of the input value
  13. currentSum.f1 += input.f1;
  14. // update the state
  15. sum.update(currentSum);
  16. // if the count reaches 2, emit the average and clear the state
  17. if (currentSum.f0 >= 2) {
  18. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  19. sum.clear();
  20. }
  21. }
  22. @Override
  23. public void open(Configuration config) {
  24. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  25. new ValueStateDescriptor<>(
  26. "average", // the state name
  27. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
  28. Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
  29. sum = getRuntimeContext().getState(descriptor);
  30. }
  31. }
  32. // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
  33. env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
  34. .keyBy(0)
  35. .flatMap(new CountWindowAverage())
  36. .print();
  37. // the printed output will be (1,4) and (1,5)