在线机器学习与离线相比,在模型更新的时效性,模型的迭代周期,业务实验效果等方面有更好的表现。所以将机器学习从离线迁移到在线已经成为提升业务指标的一个有效的手段。
在线机器学习中,样本是关键的一环。本文将给大家详细的介绍微博是如何用 Flink 来实现在线样本生成的。
为何选择 Flink 来做在线的样本生成?
在线样本生成对样本的时效性和准确性都有极高的要求。同样对作业的稳定性及是否容灾也都有严格的指标要求。基于这个前提,我们对目前较为流行的几种实时计算框架(Storm 0.10, Spark 2.11, Flink 1.10)进行了分析比较,结论如下:
因此,我们决定使用 Flink 来作为在线样本生成的实时流计算框架。
如何实现?
在线样本生成,简单描述一个业务场景:对用户的曝光数据和点击数据实时的做关联,关联后将数据输出到 Kafka 中,给下游的在线训练作业用。
首先我们要确定两个数据流关联的时间窗口。这一步一般建议先离线对两个数据流的日志做关联,通过离线的方式对两份数据在不同的时间范围内做 join,来判断在线需要的时间窗口。比如业务接受的最低关联比例是 85%,并且通过离线测试确认 20 分钟内两个数据流可以关联 85%的数据,那么就可以采用 20 分钟作为时间窗口。这里的关联比例和窗口时间实际上是在准确性和实时性之间的一个 trade-off。
确定时间窗口后,我们并没有使用 Flink 的 time window 来实现多个数据流的 join,而是选择采用 union + timer 方式来实现。这里主要考虑两点:第一、Flink 自带的 join 操作不支持多个数据流。第二、使用 timer+state 来实现,自定义程度更高,限制更少,也更方便。
接下来,我们把样本生成过程细分为:
① 输入数据流
一般我们的数据源包括 Kafka,Trigger,MQ 等。Flink 需要从数据源中实时的读取日志。
② 输入数据流的格式化和过滤
- 读取日志后,对数据做格式化,并且过滤掉不需要的字段和数据。
- 指定样本 join 的 key。例如:用户 id 和 内容 id 作 key。
- 输出的数据格式一般为 tuple2(K,V),K:参与 join 的 key。V:样本用到的字段。
③ 输入数据流的 union
- 使用 Flink 的 union 操作,将多个输入流叠加到一起,形成一个 DataStream。
- 为每个输入流指定一个可以区分的别名或者增加一个可以区分的字段。
④ 输入数据流的聚合:keyby 操作
- 对 join 的 key 做 keyby 操作。接上例,表示按照用户 id 和内容 id 对多个数据流做 join。
- 如果 key 存在数据倾斜的情况,建议对 key 加随机数后先聚合,去掉随机数后再次聚合。
⑤ 数据存储 state + timer
- 定义一个Value State。
- keyby后的process方法中,我们会重写processElement方法,在processElement方法中判断,如果value state为空,则new 一个新的state,并将数据写到value state中,并且为这条数据注册一个timer(timer会由Flink按key+timestamp自动去重),另外此处我们使用的是ProcessingTime(表示onTimer()在系统时间戳达到Timer设定的时间戳时触发)。如果不为空则按照拼接的策略,更新已经存在的结果。比如:时间窗口内 用户id1,内容id1的第一条日志数据没有点击行为,则这个字段为0,第二条点击数据进入后,将这个字段更新为1。当然除了更新操作,还有计数、累加、均值等各种操作。如何在process里区分数据是来自曝光还是点击呢,使用上面步骤③定义的别名。
- 重写onTimer方法,在onTimer方法中主要是定义定时器触发时执行的逻辑:从value state里获取到存入的数据,并将数据输出。然后执行state.clear。
- 样本从窗口输出的条件有2个:第一,timer到期。第二,业务需要的样本都拼接上了。
此处参考伪代码:
public class StateSampleFunction extends KeyedProcessFunction<String, Tuple2, ReturnSample> {
/**
* 这个状态是通过过程函数来维护,使用ValueState
*/
private ValueState state;
private Long timer = null;
public StateSampleFunction (String time){
timer = Long.valueOf(time);
}
@Override
public void open(Configuration parameters) throws Exception {
// 获取state
state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", TypeInformation.of(new TypeHint< ReturnSample >() {})));
}
@Override
public void processElement(Tuple2value, Context context, Collector< ReturnSample > collector) throws Exception {
if (value.f0 == null){
return;
}
Object sampleValue = value.f1;
Long time = context.timerService().currentProcessingTime();
ReturnSample returnSample = state.value();
if (returnSample == null) {
returnSample = new ReturnSample();
returnSample.setKey(value.f0);
returnSample.setTime(time);
context.timerService().registerProcessingTimeTimer(time +timer);
}
// 更新点击数据到state里
if (sampleValue instanceof ClickLog){
ClickLog clickLog = (ClickLog)values;
returnSample =(ReturnSample) clickLog.setSample(returnSample);
}
state.update(returnSample);
}
/**
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector< ReturnSample > out) throws Exception {
ReturnSample value = state.value();
state.clear();
out.collect(value);
}
}
⑥ 拼接后的日志格式化和过滤
- 拼接后的数据需要按照在线训练作业的要求对数据做格式化,比如 json、CSV 等格式。
- 过滤:决定什么样的数据是合格的样本。例如:有真正阅读的内容才算是可用的样本。
⑦ 输出
样本最终输出到实时的数据队列中。下面是实际的作业拓扑和运行时状态:
整个样本拼接过程的流程图:
StateBackend 的选取
使用 RocksDB/Gemini 作为 state 的 Backend 的优势和建议:
我们用大数据对 memory 和 RocksDB,Gemini 做了实验对比,结果显示 RocksDB 和 Gemin 在数据处理,作业稳定性和资源使用等方面比 memory 更合理。其中 Gemini 的优势最为明显。
此外,如果是大数据量的 state,建议使用 Gemini + SSD 固态硬盘。
样本的监控
1. Flink 作业的异常监控
- 作业失败监控
- Failover 监控
- Checkpoint 失败的监控
- RocksDB 使用情况的监控
- 作业消费 Kafka 的 Comsumer Lag 的监控
- 作业反压的监控
2. 样本输入端 Kafka 的消费延迟监控3. 样本输出端 Kafka 的写入量的监控
4. 样本监控**
- 拼接率监控
- 正样本监控
- 输出样本格式的监控
- 输出标签对应的值是否在正常范围
- 输入标签对应的值是否为 null
- 输出标签对应的值是否为空
- 输出标签对应的值是否在正常范围
样本的校验
样本生成后,如何验证数据是否准确
- 在线和离线的相互校验
将在线样本从输出的 Kafka 中接入到 HDFS 上离线存储。并按照在线 join 的时间窗口来分区。 - 用同等条件下生成的离线样本和在线样本做对比
- 白名单用户的全流程校验
将白名单用户的日志和样本结果存入 ES 等实时数仓中,来做校验。
故障的处理
样本异常对线上模型训练的影响非常大。当发现异常报警时,首先要做的是向在线模型训练作业发送样本异常的报警。收到报警信息后,模型停止更新。从而避免影响模型线上效果。
普通意义的业务故障解决后,丢弃原来的数据,所有输入日志流从最新的时间点开始消费并生成新的样本即可。重要业务需要重置输入日志流的 Kafka offset 从故障时间点开始重新生成样本数据。
平台化
通过平台化对样本生成的流程做出严格的规范非常重要。在平台化的过程中,需要提供简单通用的开发模板以提高作业开发效率;提供平台化的作业监控和样本指标监控框架,避免重复造车;提供通用的样本输出落地策略,和在线/离线校验策略,更便捷的为业务方服务。
微博基于 Flink 搭建的在线样本生成平台架构,如图:
UI 页面,如图:
基于平台化开发,用户只需要关心业务逻辑部分即可。需要用户开发的有:
- 对应输入数据的数据清洗逻辑
- 样本输出前的数据清洗逻辑
其余的在 UI 上配置即可实现,具体有:
- 输入 Kafka 的配置信息及对应数据清洗的 UDF 类
- 样本拼接的时间窗口
- 窗口内对字段的聚合操作
- 样本输出的 Kafka 配置信息及输出前数据清洗和格式化的 UDF 类
资源情况由平台方审核并配置。完成后,自动生成并提交作业。
作业提交后:
1. 平台会提供如前所述的作业相关监控,如下:
■ Flink 作业的异常监控**
- 作业失败监控
- Failover 监控
- Checkpoint 失败的监控
- RocksDB 使用情况的监控
- 作业消费 Kafka 的 Comsumer Lag 的监控
- 作业反压的监控
■ 样本监控
- 拼接率监控
- 正样本监控
- 输出样本格式的监控
- 输出标签对应的值是否在正常范围
- 输入标签对应的值是否为 null
- 输出标签对应的值是否为空
- 输出标签对应的值是否在正常范围
2. 平台会自动将数据落盘,存储到HDFS上。方便离线验证或者离线训练。
3. 用户只需将精力放到样本的验证上即可,由平台方保证作业的稳定性。
作者介绍:
曹富强,微博机器学习研发中心-高级系统工程师。现负责微博机器学习平台数据计算/数据存储模块,主要涉及实时计算 Flink、Storm、Spark Streaming,数据存储Kafka、Redis,离线计算 Hive、Spark 等。目前专注于 Flink/Kafka/Redis 在微博机器学习场景的应用,为机器学习提供框架,技术,应用层面的支持。