在实际应用中,我们常需要将某个时间间隔(1小时,5分钟,10秒等间隔)的数据输出到一个文件,该数据文件随后可以被放入批式处理、增量训练等流程。
Alink提供了Export2FileSinkStreamOp组件,可以按指定的时间间隔,将数据保存到本地或远程的文件系统,每个数据文件都是AK格式。下面先介绍组件的参数:

  • 参数WindowTime,设置间隔时间,以秒为单位,是必填参数。
  • 参数FilePath,保存数据的文件夹名称,是必填参数
  • 参数TimeCol,为时间列名称,是可选参数。如果没有设置该参数,会以流式任务运行节点的本地时间,计算间隔、输出数据文件。
  • 参数OverwriteSink,为布尔型参数,默认值为false,即当发现参数“FilePath”指定的文件夹存在时会报错;如果设置为true,则会将文件夹中的文件全部删除,再写入新的数据文件。


�下面会以是否指定TimeCol参数,构建两个实例,帮助读者了解组件的功能。

�3.2.5.1 按本地时间定时输出流式数据

我们以MovieLens的一个评分数据为例,关于该数据的介绍,请参见24.3节,这里不再重复。使用TsvSourceStreamOp组件,可以直接从网络地址读取数据,具体代码如下:

  1. source = TsvSourceStreamOp()\
  2. .setFilePath("http://files.grouplens.org/datasets/movielens/ml-100k/u.data")\
  3. .setSchemaStr("user_id long, item_id long, rating float, ts long")\
  4. .link(
  5. UDFStreamOp()\
  6. .setFunc(from_unix_timestamp)\
  7. .setSelectedCols(["ts"])\
  8. .setOutputCol("ts")
  9. )

其中用户自定义函数FromUnixTimestamp的定义如下,该函数用于将原始数据中的long型表示时间的数据转换为Timestamp格式。

  1. import datetime
  2. @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.TIMESTAMP(3))
  3. def from_unix_timestamp(ts):
  4. return datetime.datetime.fromtimestamp(ts)

然后,将流式source连接Export2FileSinkStreamOp组件,并设置参数,具体代码如下,这里没有设置TimeCol参数,意味着组件会按本地时间进行数据保存,并按WindowTime参数的设置,每隔5秒保存一次文件。

  1. source.link(
  2. Export2FileSinkStreamOp()\
  3. .setFilePath(LOCAL_DIR + "with_local_time")\
  4. .setWindowTime(5)\
  5. .setOverwriteSink(True)
  6. )

我们还连接了一个流式的Sink组件,用于准备后面的实验,具体代码如下。

  1. source.link(
  2. AkSinkStreamOp()\
  3. .setFilePath(LOCAL_DIR + "ratings.ak")\
  4. .setOverwriteSink(True)
  5. )

执行流式任务,我们看到新建了名称为”with_local_time”的文件夹,具体内容详见下面的截图。其中包含了5个文件,都是AK格式,文件名是以间隔区间的结束时间点命名的,譬如:“202204211524050”对应时间:2022年4月21日15:24:05.0
image.png

最后,再做个简单的验证,统计一下”with_local_time”文件夹下的数据,详细代码如下:

  1. AkSourceBatchOp()\
  2. .setFilePath(LOCAL_DIR + "with_local_time")\
  3. .lazyPrintStatistics("Statistics for data in the folder 'with_local_time' : ")
  4. BatchOperator.execute()

运行结果如下,共100000条数据,与原始数据集的条数相同。

  1. Statistics for data in the folder 'with_local_time' :
  2. Summary:
  3. |colName| count|missing| sum| mean| variance|min| max|
  4. |-------|------|-------|--------|--------|-----------|---|----|
  5. |user_id|100000| 0|46248475|462.4848| 71083.249| 1| 943|
  6. |item_id|100000| 0|42553013|425.5301|109427.5525| 1|1682|
  7. | rating|100000| 0| 352986| 3.5299| 1.2671| 1| 5|
  8. | ts|100000| 0| NaN| NaN| NaN|NaN| NaN|

�3.2.5.2 按数据的时间列,定时输出流式数据

在实验前,先做一个数据准备工作,ratings.ak中的各条数据,从其ts时间列来看,几乎是随机分布在数据集中,我们相对其进行排序,并存入ratings_ordered.ak文件。具体代码如下:

  1. AkSourceBatchOp()\
  2. .setFilePath(LOCAL_DIR + "ratings.ak")\
  3. .orderBy("ts", 1000000)\
  4. .lazyPrintStatistics("Statistics for data in the file 'ratings.ak' : ")\
  5. .link(
  6. AkSinkBatchOp()\
  7. .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
  8. .setOverwriteSink(True)
  9. )
  10. BatchOperator.execute()


以流的方式读入准备好的数据,然后连接Export2FileSinkStreamOp组件,设置时间列为 ts 列,并设置时间间隔为3600X24秒,即1天。具体代码如下:

  1. AkSourceStreamOp()\
  2. .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
  3. .link(
  4. Export2FileSinkStreamOp()\
  5. .setFilePath(LOCAL_DIR + "with_ts_time")\
  6. .setTimeCol("ts")\
  7. .setWindowTime(3600 * 24)\
  8. .setOverwriteSink(True)
  9. )
  10. StreamOperator.execute()

访问”with_local_time”文件夹,查看运行结果,具体内容详见下面的截图,每个文件保存了一天的数据。
image.png

再读取数据,进行深入验证,一方面读取整个文件夹的数据,进行统计;另一方面,选择一个数据文件,打印输出,查看起时间列的信息。详细代码如下:

  1. AkSourceBatchOp()\
  2. .setFilePath(LOCAL_DIR + "with_ts_time")\
  3. .lazyPrintStatistics("Statistics for data in the folder 'with_ts_time' : ")
  4. AkSourceBatchOp()\
  5. .setFilePath(LOCAL_DIR + "with_ts_time" + os.sep + "199709210000000")\
  6. .print()

运行结果如下,”with_local_time”文件夹下包含100000条数据,而且各项统计指标也与 ratings.ak 的也相同;

  1. Statistics for data in the folder 'with_ts_time' :
  2. Summary:
  3. |colName| count|missing| sum| mean| variance|min| max|
  4. |-------|------|-------|--------|--------|-----------|---|----|
  5. |user_id|100000| 0|46248475|462.4848| 71083.249| 1| 943|
  6. |item_id|100000| 0|42553013|425.5301|109427.5525| 1|1682|
  7. | rating|100000| 0| 352986| 3.5299| 1.2671| 1| 5|
  8. | ts|100000| 0| NaN| NaN| NaN|NaN| NaN|

文件199709210000000中的各数据显示如下图所示,都为1997-09-20的,也符合预期。
image.png

3.2.5.3 输出带分区的数据

在文件系统上存储长期的数据,经常将时间作为分区名称,通过选择分区名称,选择部分数据进行操作。Export2FileSinkStreamOp组件可以通过指定参数PartitionsFormat来定义输出的分区格式。如下面代码所示,”year=yyyy/month=MM/day=dd”表明输出三级文件夹,第一级文件夹的名称以“year=”开头,后面是4位的年份信息;第二级文件夹的名称以“month=”开头,后面是2位的月份信息,譬如:1月表示为“01”,12月表示为“12”;第三级文件夹的名称以“day=”开头,后面是2位的天信息,譬如:5号表示为“05”,31号表示为“31”。

  1. AkSourceStreamOp()\
  2. .setFilePath(LOCAL_DIR + "ratings_ordered.ak")\
  3. .link(
  4. Export2FileSinkStreamOp()\
  5. .setFilePath(LOCAL_DIR + "data_with_partitions")\
  6. .setTimeCol("ts")\
  7. .setWindowTime(3600 * 24)\
  8. .setPartitionsFormat("year=yyyy/month=MM/day=dd")\
  9. .setOverwriteSink(True)
  10. )
  11. StreamOperator.execute()

运行结束后,我们可以在文件夹看到如下结构:
image.png