Keyed State and Operator State
Keyed State 键状态
Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStrem.
Keyed State , exactly one state-partition per key.Each keyed-state is logically bound to a unique composite of
Operator State 操作符状态
each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.
Raw and Managed State 原始状态和管理状态
Keyed State and Operator State exist in two forms: managed and raw.
Managed State is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. Examples are “ValueState”, “ListState”, etc. Flink’s runtime encodes the states and writes them into the checkpoints.
Raw State is state that operators keep in their own data structures.
Using managed state (rather than raw state) is recommended, since with managed state Flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management.
Using Managed Keyed State
- ValueState
: This keeps a value that can be updated and retrieved(检索). - ListState: This keeps a list of elements.You can append elements and retrieve an
Iterableover all currently stored elements.- add(T)
- addAll(List
) - Iterable
get() - update(List
)
- ReducingState
: This keeps a single value that represents the aggregation of all values added to the state.The interface is similar to ListState but elements added using add(T) are reduced to an aggregate using a specified ReduceFunction. - AggregatingState
: This keeps a single value that represents the aggregation of all values added to the state.
State is accessed using the RuntimeContext, so it is only possible in rich functions.
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(0).flatMap(new CountWindowAverage()).print();// the printed output will be (1,4) and (1,5)
