https://copyfuture.com/blogs-details/202206111206047704

1.1 有状态算子

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果,有状态的算子任务,除当前数据外,还需要一些其他数据来得到计算结果。
最常见的状态(state),就是之前到达的数据,或者由之前数据计算出的某个结果
如:sum计算时,需要保存之前所有数据的和;窗口算子中会保存已经到达的所有数据;检索先有下单行为,后有支付行为的事件模式(event pattern),也应该把之前的行为保存下来。

1.2 状态的管理

Flink 将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。Flink 有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整。
——— 状态的访问权限。 Flink 上的聚合和窗口操作,一般都是基于 KeyedStream的,数据会按照 key 的哈希值进行分区,聚合处理的结果只对当前 key 有效。然而同一个分区( slot)上执行的任务实例,可能会包含多个 key 的数据,它们同时访问和更改本地变量,就会导致计算结果错误。
——— 容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。
——— 分布式应用的横向扩展性。处理的数据量增大时,相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。

1.3 状态的分类

1.3.1 托管状态(Managed State)和原始状态(Raw State)

托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现;而原始状态则是自定义的,相当于就是开辟了一块内存,需要自己管理,实现状态的序列化和故障恢复
托管状态是由 Flink 的运行时(Runtime)来托管的;在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。对于具体的状态内容,Flink 提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。聚合、窗口等算子中内置的状态,都是托管状态;也可以在富函数类(RichFunction)中通过上下文来自定义状态。
原始状态全部需要自定义。Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储
所以只有在遇到托管状态无法实现的特殊需求时,才会考虑使用原始状态;绝大多数应用场景,用 Flink 提供的算子或者自定义托管状态来实现需求


1.3.2 算子状态(Operator State)和按键分区状态(Keyed State)

将托管状态分为两类:算子状态和按键分区状态
(1)算子状态(Operator State)
状态作用范围限定为当前的算子任务实例,只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。
算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,需进一步实现 CheckpointedFunction 接口。

(2)按键分区状态(Keyed State)
状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用。

2. 按键分区状态(Keyed State)

2.1 概念

因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的 key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。
在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。

2.2 支持的结构类型

2.2.1 值状态(ValueState)

  1. public interface ValueState<T> extends State {
  2. T value() throws IOException;
  3. void update(T value) throws IOException;
  4. }
  5. // T value():获取当前状态的值;
  6. // update(T value):对状态进行更新,传入的参数 value 就是要覆写的状态值

在具体使用时,为了让运行时上下文清楚到底是哪个状态,需要创建一个状态描述器(StateDescriptor)来提供状态的基本信息。
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { }
代码演示:

  1. package org.example.flink.state;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. /**
  7. * @ProjectName heima_flink_study
  8. * @ClassName PeriodicPvExample
  9. * @Email 18238287049@163.com
  10. * @Author zhanggp
  11. * @Date 2022/06/25 09:37
  12. * @Description 使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,隔一段时间发送 pv 的统计结果
  13. */
  14. public class PeriodicPvExample {
  15. public static void main(String[] args) throws Exception {
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. env.setParallelism(1);
  18. SingleOutputStreamOperator<Event> stream = env
  19. .addSource(new ClickSource())
  20. .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
  21. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  22. @Override
  23. public long extractTimestamp(Event element, long recordTimestamp) {
  24. return element.timestamp;
  25. }
  26. })
  27. );
  28. stream.print("input");
  29. // 统计每个用户的 pv,隔一段时间(10s)输出一次结果
  30. stream.keyBy(data -> data.user)
  31. .process(new PeriodicPvResult())
  32. .print();
  33. env.execute();
  34. }
  35. // 注册定时器,周期性输出 pv
  36. public static class PeriodicPvResult extends KeyedProcessFunction<String, Event, String> {
  37. ValueState<Long> countState;
  38. ValueState<Long> timerTsState;
  39. @Override
  40. public void open(Configuration parameters) throws Exception {
  41. countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
  42. timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
  43. }
  44. @Override
  45. public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
  46. // 更新 count 值
  47. Long count = countState.value();
  48. if (count == null) {
  49. countState.update(1L);
  50. } else {
  51. countState.update(count + 1);
  52. }
  53. // 注册定时器
  54. if (timerTsState.value() == null) {
  55. ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
  56. timerTsState.update(value.timestamp + 10 * 1000L);
  57. }
  58. }
  59. @Override
  60. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  61. out.collect(ctx.getCurrentKey() + " pv: " + countState.value());
  62. // 清空状态
  63. timerTsState.clear();
  64. }
  65. }
  66. }

2.2.2 列表状态(ListState)

  1. Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型 Iterable<T>;
  2. update(List<T> values):传入一个列表 values,直接对状态进行覆盖;
  3. add(T value):在状态列表中添加一个元素 value
  4. addAll(List<T> values):向列表中添加多个元素,以列表 values 形式传入
  1. public class TwoStreamFullJoinExample {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.fromElements(
  6. Tuple3.of("a", "stream-1", 1000L),
  7. Tuple3.of("b", "stream-1", 2000L)
  8. )
  9. .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
  10. .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
  11. @Override
  12. public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
  13. return t.f2;
  14. }
  15. })
  16. );
  17. SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.fromElements(
  18. Tuple3.of("a", "stream-2", 3000L),
  19. Tuple3.of("b", "stream-2", 4000L)
  20. )
  21. .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
  22. .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
  23. @Override
  24. public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
  25. return t.f2;
  26. }
  27. })
  28. );
  29. // 两个流合并
  30. stream1.keyBy(r -> r.f0)
  31. .connect(stream2.keyBy(r -> r.f0))
  32. .process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
  33. private ListState<Tuple3<String, String, Long>> stream1ListState;
  34. private ListState<Tuple3<String, String, Long>> stream2ListState;
  35. @Override
  36. public void open(Configuration parameters) throws Exception {
  37. super.open(parameters);
  38. stream1ListState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("stream1-list", Types.TUPLE(Types.STRING, Types.STRING))
  39. );
  40. stream2ListState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple3<String, String, Long>>("stream2-list", Types.TUPLE(Types.STRING, Types.STRING))
  41. );
  42. }
  43. @Override
  44. public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> collector) throws Exception {
  45. stream1ListState.add(left);
  46. for (Tuple3<String, String, Long> right : stream2ListState.get()) {
  47. collector.collect(left + " => " + right);
  48. }
  49. }
  50. @Override
  51. public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
  52. stream2ListState.add(right);
  53. for (Tuple3<String, String, Long> left : stream1ListState.get()) {
  54. collector.collect(left + " => " + right);
  55. }
  56. }
  57. })
  58. .print();
  59. env.execute();
  60. }
  61. }

2.2.3 映射状态(MapState)

  1. UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
  2. put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
  3. putAll(Map<UK, UV> map):将传入的映射 map 中所有的键值对,全部添加到映射状态中;
  4. remove(UK key):将指定 key 对应的键值对删除;
  5. boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean
  6. Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
  7. Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代 Iterable 类型;
  8. Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代 Iterable类型;
  9. boolean isEmpty():判断映射是否为空,返回一个 boolean 值。
  1. public class FakeWindowExample {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
  6. .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
  7. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  8. @Override
  9. public long extractTimestamp(Event element, long recordTimestamp) {
  10. return element.timestamp;
  11. }
  12. })
  13. );
  14. // 统计每 10s 窗口内,每个 url 的 pv
  15. stream.keyBy(data -> data.url)
  16. .process(new FakeWindowResult(10000L))
  17. .print();
  18. env.execute();
  19. }
  20. public static class FakeWindowResult extends KeyedProcessFunction<String, Event, String> {
  21. // 定义属性,窗口长度
  22. private Long windowSize;
  23. public FakeWindowResult(Long windowSize) {
  24. this.windowSize = windowSize;
  25. }
  26. // 声明状态,用 map 保存 pv 值(窗口 start,count)
  27. MapState<Long, Long> windowPvMapState;
  28. @Override
  29. public void open(Configuration parameters) throws Exception {
  30. windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("window-pv", Long.class, Long.class));
  31. }
  32. @Override
  33. public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
  34. // 每来一条数据,就根据时间戳判断属于哪个窗口
  35. Long windowStart = value.timestamp / windowSize * windowSize;
  36. Long windowEnd = windowStart + windowSize;
  37. // 注册 end -1 的定时器,窗口触发计算
  38. ctx.timerService().registerEventTimeTimer(windowEnd - 1);
  39. // 更新状态中的 pv 值
  40. if (windowPvMapState.contains(windowStart)) {
  41. Long pv = windowPvMapState.get(windowStart);
  42. windowPvMapState.put(windowStart, pv + 1);
  43. } else {
  44. windowPvMapState.put(windowStart, 1L);
  45. }
  46. }
  47. // 定时器触发,直接输出统计的 pv 结果
  48. @Override
  49. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  50. Long windowEnd = timestamp + 1;
  51. Long windowStart = windowEnd - windowSize;
  52. Long pv = windowPvMapState.get(windowStart);
  53. out.collect("url: " + ctx.getCurrentKey()
  54. + " 访问量: " + pv
  55. + " 窗 口 : " + new Timestamp(windowStart) + " ~ " + new
  56. Timestamp(windowEnd));
  57. // 模拟窗口的销毁,清除 map 中的 key
  58. windowPvMapState.remove(windowStart);
  59. }
  60. }
  61. }

3. 算子状态(Operator State)

3.1 概念

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态的实际应用场景一般用在 Source 或 Sink 等与外部系统连接的算子上。

3.2 状态类型

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和BroadcastState。

3.2.1 列表状态(ListState)

在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个列表(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立
当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个大列表,然后再均匀地分配给所有并行任务。这种均匀分配的具体方法就是轮询(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一发牌的方式将状态项平均分配的。

3.2.2 联合列表状态(UnionListState)

UnionListState 的重点就在于联合(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的大列表,可以自行选择要使用的状态项和要丢弃的状态项。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

3.2.3 广播状态(BroadcastState)

所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

3.3 代码实现

在 Flink 中,对状态进行持久化保存的快照机制叫作检查点(Checkpoint)。于是使用算子状态时,就需要对检查点的相关操作进行定义,实现一个 CheckpointedFunction 接口
每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。而在算子任务进行初始化时,会调用. initializeState()方法。这又有两种情况:一种是整个应用第一次运行,这时状态会被初始化为一个默认值(default value);另一种是应用重启时,从检查点(checkpoint)或者保存点(savepoint)中读取之前状态的快照,并赋给本地状态。所以,接口中的.snapshotState()方法定义了检查点的快照保存逻辑,而. initializeState()方法不仅定义了初始化逻辑,也定义了恢复逻辑
CheckpointedFunction 接口中的两个方法,分别传入了一个上下文(context)作为参数。不同的是,.snapshotState()方法拿到的是快照的上下文 FunctionSnapshotContext,它可以提供检查点的相关信息,不过无法获取状态句柄;而. initializeState()方法拿到的是FunctionInitializationContext,这是函数类进行初始化时的上下文,是真正的运行时上下文。FunctionInitializationContext 中提供了算子状态存储(OperatorStateStore)和按键分区状态存储(”KeyedStateStore),在这两个存储对象中可以非常方便地获取当前任务实例中的 OperatorState 和 Keyed State.

  1. ListStateDescriptor<String> descriptor =
  2. new ListStateDescriptor<>(
  3. "buffered-elements",
  4. Types.of(String));
  5. ListState<String> checkpointedState =
  6. context.getOperatorStateStore().getListState(descriptor);
  1. public class BufferingSinkExample {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env =
  4. StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
  7. .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
  8. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  9. @Override
  10. public long extractTimestamp(Event element, long recordTimestamp) {
  11. return element.timestamp;
  12. }
  13. })
  14. );
  15. stream.print("input");
  16. // 批量缓存输出
  17. stream.addSink(new BufferingSink(10));
  18. env.execute();
  19. }
  20. public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {
  21. private final int threshold;
  22. private transient ListState<Event> checkpointedState;
  23. private List<Event> bufferedElements;
  24. public BufferingSink(int threshold) {
  25. this.threshold = threshold;
  26. this.bufferedElements = new ArrayList<>();
  27. }
  28. @Override
  29. public void invoke(Event value, Context context) throws Exception {
  30. bufferedElements.add(value);
  31. if (bufferedElements.size() == threshold) {
  32. for (Event element : bufferedElements) {
  33. // 输出到外部系统,这里用控制台打印模拟
  34. System.out.println(element);
  35. }
  36. System.out.println("==========输出完毕=========");
  37. bufferedElements.clear();
  38. }
  39. }
  40. @Override
  41. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  42. checkpointedState.clear();
  43. // 把当前局部变量中的所有元素写入到检查点中
  44. for (Event element : bufferedElements) {
  45. checkpointedState.add(element);
  46. }
  47. }
  48. @Override
  49. public void initializeState(FunctionInitializationContext context) throws Exception {
  50. ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>(
  51. "buffered-elements",
  52. Types.POJO(Event.class));
  53. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  54. // 如果是从故障中恢复,就将 ListState 中的所有元素添加到局部变量中
  55. if (context.isRestored()) {
  56. for (Event element : checkpointedState.get()) {
  57. bufferedElements.add(element);
  58. }
  59. }
  60. }
  61. }
  62. }

4. 广播状态(Broadcast State)

直接调用 DataStream 的.broadcast()方法,传入一个映射状态描述器(MapStateDescriptor)说明状态的名称和类型,就可以得到一个广播流(BroadcastStream);进而将要处理的数据流与这条广播流进行连接(connect),就会得到“广播连接流”(BroadcastConnectedStream)。注意广播状态只能用在广播连接流中。

  1. MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
  2. BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
  3. DataStream<String> output = stream
  4. .connect(ruleBroadcastStream)
  5. .process( new BroadcastProcessFunction<>() {
  6. ...} );
  1. public class BroadcastStateExample {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. // 读取用户行为事件流
  6. DataStreamSource<Action> actionStream = env.fromElements(
  7. new Action("Alice", "login"),
  8. new Action("Alice", "pay"),
  9. new Action("Bob", "login"),
  10. new Action("Bob", "buy")
  11. );
  12. // 定义行为模式流,代表了要检测的标准
  13. DataStreamSource<Pattern> patternStream = env.fromElements(
  14. new Pattern("login", "pay"),
  15. new Pattern("login", "buy")
  16. );
  17. // 定义广播状态的描述器,创建广播流
  18. MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
  19. BroadcastStream<Pattern> bcPatterns = patternStream.broadcast(bcStateDescriptor);
  20. // 将事件流和广播流连接起来,进行处理
  21. DataStream<Tuple2<String, Pattern>> matches = actionStream
  22. .keyBy(data -> data.userId)
  23. .connect(bcPatterns)
  24. .process(new PatternEvaluator());
  25. matches.print();
  26. env.execute();
  27. }
  28. public static class PatternEvaluator extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {
  29. // 定义一个值状态,保存上一次用户行为
  30. ValueState<String> prevActionState;
  31. @Override
  32. public void open(Configuration conf) {
  33. prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
  34. }
  35. @Override
  36. public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
  37. BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
  38. // 将广播状态更新为当前的 pattern
  39. bcState.put(null, pattern);
  40. }
  41. @Override
  42. public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
  43. Pattern pattern = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);
  44. String prevAction = prevActionState.value();
  45. if (pattern != null && prevAction != null) {
  46. // 如果前后两次行为都符合模式定义,输出一组匹配
  47. if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {
  48. out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
  49. }
  50. }
  51. // 更新状态
  52. prevActionState.update(action.action);
  53. }
  54. }
  55. // 定义用户行为事件 POJO 类
  56. public static class Action {
  57. public String userId;
  58. public String action;
  59. public Action() {
  60. }
  61. public Action(String userId, String action) {
  62. this.userId = userId;
  63. this.action = action;
  64. }
  65. @Override
  66. public String toString() {
  67. return "Action{" +
  68. "userId=" + userId +
  69. ", action='" + action + '\'' +
  70. '}';
  71. }
  72. }
  73. // 定义行为模式 POJO 类,包含先后发生的两个行为
  74. public static class Pattern {
  75. public String action1;
  76. public String action2;
  77. public Pattern() {
  78. }
  79. public Pattern(String action1, String action2) {
  80. this.action1 = action1;
  81. this.action2 = action2;
  82. }
  83. @Override
  84. public String toString() {
  85. return "Pattern{" +
  86. "action1='" + action1 + '\'' +
  87. ", action2='" + action2 + '\'' +
  88. '}';
  89. }
  90. }
  91. }