在实际应用中,我们常需要将某个时间间隔(1小时,5分钟,10秒等间隔)的数据输出到一个文件,该数据文件随后可以被放入批式处理、增量训练等流程。
Alink提供了Export2FileSinkStreamOp组件,可以按指定的时间间隔,将数据保存到本地或远程的文件系统,每个数据文件都是AK格式。下面先介绍组件的参数:

  • 参数WindowTime,设置间隔时间,以秒为单位,是必填参数。
  • 参数FilePath,保存数据的文件夹名称,是必填参数
  • 参数TimeCol,为时间列名称,是可选参数。如果没有设置该参数,会以流式任务运行节点的本地时间,计算间隔、输出数据文件。
  • 参数OverwriteSink,为布尔型参数,默认值为false,即当发现参数“FilePath”指定的文件夹存在时会报错;如果设置为true,则会将文件夹中的文件全部删除,再写入新的数据文件。


�下面会以是否指定TimeCol参数,构建两个实例,帮助读者了解组件的功能。

�3.2.5.1 按本地时间定时输出流式数据

我们以MovieLens的一个评分数据为例,关于该数据的介绍,请参见24.3节,这里不再重复。使用TsvSourceStreamOp组件,可以直接从网络地址读取数据,具体代码如下:

  1. StreamOperator <?> source = new TsvSourceStreamOp()
  2. .setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")
  3. .setSchemaStr("user_id long, item_id long, rating float, ts long")
  4. .udf("ts", "ts", new FromUnixTimestamp());

其中用户自定义函数FromUnixTimestamp的定义如下,该函数用于将原始数据中的long型表示时间的数据转换为Timestamp格式。

  1. public static class FromUnixTimestamp extends ScalarFunction {
  2. public java.sql.Timestamp eval(Long ts) {
  3. return new java.sql.Timestamp(ts * 1000);
  4. }
  5. }

然后,将流式source连接Export2FileSinkStreamOp组件,并设置参数,具体代码如下,这里没有设置TimeCol参数,意味着组件会按本地时间进行数据保存,并按WindowTime参数的设置,每隔5秒保存一次文件。

  1. source.link(
  2. new Export2FileSinkStreamOp()
  3. .setFilePath(LOCAL_DIR + "with_local_time")
  4. .setWindowTime(5)
  5. .setOverwriteSink(true)
  6. );

我们还连接了一个流式的Sink组件,用于准备后面的实验,具体代码如下。

  1. source.link(
  2. new AkSinkStreamOp()
  3. .setFilePath(LOCAL_DIR + "ratings.ak")
  4. .setOverwriteSink(true)
  5. );

执行流式任务,我们看到新建了名称为”with_local_time”的文件夹,具体内容详见下面的截图。其中包含了5个文件,都是AK格式,文件名是以间隔区间的结束时间点命名的,譬如:“202204211524050”对应时间:2022年4月21日15:24:05.0
image.png

最后,再做个简单的验证,统计一下”with_local_time”文件夹下的数据,详细代码如下:

  1. new AkSourceBatchOp()
  2. .setFilePath(LOCAL_DIR + "with_local_time")
  3. .lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ");
  4. BatchOperator.execute();

运行结果如下,共100000条数据,与原始数据集的条数相同。

  1. Statistics for data in the folder 'with_local_time' :
  2. Summary:
  3. |colName| count|missing| sum| mean| variance|min| max|
  4. |-------|------|-------|--------|--------|-----------|---|----|
  5. |user_id|100000| 0|46248475|462.4848| 71083.249| 1| 943|
  6. |item_id|100000| 0|42553013|425.5301|109427.5525| 1|1682|
  7. | rating|100000| 0| 352986| 3.5299| 1.2671| 1| 5|
  8. | ts|100000| 0| NaN| NaN| NaN|NaN| NaN|

�3.2.5.2 按数据的时间列,定时输出流式数据

在实验前,先做一个数据准备工作,ratings.ak中的各条数据,从其ts时间列来看,几乎是随机分布在数据集中,我们相对其进行排序,并存入ratings_ordered.ak文件。具体代码如下:

  1. new AkSourceBatchOp()
  2. .setFilePath(LOCAL_DIR + "ratings.ak")
  3. .orderBy("ts", 1000000)
  4. .lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")
  5. .link(
  6. new AkSinkBatchOp()
  7. .setFilePath(LOCAL_DIR + "ratings_ordered.ak")
  8. .setOverwriteSink(true)
  9. );
  10. BatchOperator.execute();


以流的方式读入准备好的数据,然后连接Export2FileSinkStreamOp组件,设置时间列为 ts 列,并设置时间间隔为3600X24秒,即1天。具体代码如下:

  1. new AkSourceStreamOp()
  2. .setFilePath(LOCAL_DIR + "ratings_ordered.ak")
  3. .link(
  4. new Export2FileSinkStreamOp()
  5. .setFilePath(LOCAL_DIR + "with_ts_time")
  6. .setTimeCol("ts")
  7. .setWindowTime(3600 * 24)
  8. .setOverwriteSink(true)
  9. );
  10. StreamOperator.execute();

访问”with_local_time”文件夹,查看运行结果,具体内容详见下面的截图,每个文件保存了一天的数据。
image.png

再读取数据,进行深入验证,一方面读取整个文件夹的数据,进行统计;另一方面,选择一个数据文件,打印输出,查看起时间列的信息。详细代码如下:

  1. new AkSourceBatchOp()
  2. .setFilePath(LOCAL_DIR + "with_ts_time")
  3. .lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ");
  4. new AkSourceBatchOp()
  5. .setFilePath(LOCAL_DIR + "with_ts_time" + File.separator + "199709210000000")
  6. .print();

运行结果如下,”with_local_time”文件夹下包含100000条数据,而且各项统计指标也与 ratings.ak 的也相同;文件199709210000000中的各数据都为1997-09-20的,也符合预期。

  1. Statistics for data in the folder 'with_ts_time' :
  2. Summary:
  3. |colName| count|missing| sum| mean| variance|min| max|
  4. |-------|------|-------|--------|--------|-----------|---|----|
  5. |user_id|100000| 0|46248475|462.4848| 71083.249| 1| 943|
  6. |item_id|100000| 0|42553013|425.5301|109427.5525| 1|1682|
  7. | rating|100000| 0| 352986| 3.5299| 1.2671| 1| 5|
  8. | ts|100000| 0| NaN| NaN| NaN|NaN| NaN|
  9. user_id|item_id|rating|ts
  10. -------|-------|------|---
  11. 259|185|4.0000|1997-09-20 11:06:21.0
  12. 259|357|5.0000|1997-09-20 11:18:05.0
  13. 712|1040|4.0000|1997-09-20 12:28:02.0
  14. 712|553|5.0000|1997-09-20 12:30:50.0
  15. 712|174|5.0000|1997-09-20 12:33:15.0
  16. 712|462|3.0000|1997-09-20 12:34:45.0
  17. 712|365|3.0000|1997-09-20 12:37:14.0
  18. 712|393|3.0000|1997-09-20 12:38:40.0
  19. 712|404|3.0000|1997-09-20 12:41:07.0
  20. 712|95|4.0000|1997-09-20 12:42:32.0
  21. ......
  22. 259|173|4.0000|1997-09-20 11:07:23.0
  23. 851|687|2.0000|1997-09-20 12:02:48.0
  24. 712|510|2.0000|1997-09-20 12:29:09.0
  25. 712|173|5.0000|1997-09-20 12:31:41.0
  26. 712|692|5.0000|1997-09-20 12:33:15.0
  27. 712|195|3.0000|1997-09-20 12:34:45.0
  28. 712|585|4.0000|1997-09-20 12:37:14.0
  29. 712|584|4.0000|1997-09-20 12:39:02.0
  30. 712|655|5.0000|1997-09-20 12:41:07.0
  31. 712|747|3.0000|1997-09-20 12:42:32.0

3.2.5.3 输出带分区的数据

在文件系统上存储长期的数据,经常将时间作为分区名称,通过选择分区名称,选择部分数据进行操作。Export2FileSinkStreamOp组件可以通过指定参数PartitionsFormat来定义输出的分区格式。如下面代码所示,”year=yyyy/month=MM/day=dd”表明输出三级文件夹,第一级文件夹的名称以“year=”开头,后面是4位的年份信息;第二级文件夹的名称以“month=”开头,后面是2位的月份信息,譬如:1月表示为“01”,12月表示为“12”;第三级文件夹的名称以“day=”开头,后面是2位的天信息,譬如:5号表示为“05”,31号表示为“31”。

  1. new AkSourceStreamOp()
  2. .setFilePath(LOCAL_DIR + "ratings_ordered.ak")
  3. .link(
  4. new Export2FileSinkStreamOp()
  5. .setFilePath(LOCAL_DIR + "data_with_partitions")
  6. .setTimeCol("ts")
  7. .setWindowTime(3600 * 24)
  8. .setPartitionsFormat("year=yyyy/month=MM/day=dd")
  9. .setOverwriteSink(true)
  10. );
  11. StreamOperator.execute();

运行结束后,我们可以在文件夹看到如下结构:
image.png