AkSourceBatchOp/StreamOp、CsvSourceBatchOp/StreamOp、ParquetSourceBatchOp/StreamOp等批式或流式数据源组件,都支持选择分区读取数据。

  1. 分区目录名格式为"分区名=值",例如: month=06/day=17;month=06/day=18Alink将遍历目录下的分区名和分区值,构造分区表:
month day
06 17
06 18

使用SQL语句查找分区,例如:AkSourceBatchOp.setPartitions(“day = ‘17’”),分区选择语法参考《Flink SQL 内置函数》,分区值为String类型。

上节的最后,使用Export2FileSinkStreamOp组件生成了分区数据;本节将以AkSourceBatchOp/StreamOp为例,演示选择分区数据进行读取。

�3.2.6.1 按分区读取批式数据

下面的例子中,我们先读取整个根文件夹的数据,随后,通过设置参数Partitions,选择不同的分区。”year=’1997’”表示选择年份为1997的数据;使用更复杂的表达式”year=’1997’ AND month>=’10’”,表示1997年中月份数大于10的月份,即选择1997年的最后3个月。具体代码如下:

  1. AkSourceBatchOp()\
  2. .setFilePath(LOCAL_DIR + "data_with_partitions")\
  3. .lazyPrintStatistics("Statistics for data in the folder 'data_with_partitions' : ")
  4. AkSourceBatchOp()\
  5. .setFilePath(LOCAL_DIR + "data_with_partitions")\
  6. .setPartitions("year='1997'")\
  7. .lazyPrintStatistics("Statistics for data of year=1997 : ")
  8. AkSourceBatchOp()\
  9. .setFilePath(LOCAL_DIR + "data_with_partitions")\
  10. .setPartitions("year='1997' AND month>='10'")\
  11. .lazyPrintStatistics("Statistics for data of year 1997's last 3 months : ")
  12. BatchOperator.execute()

输入各个数据集的统计结果如下,整个数据集有10000条记录;1997年的数据有52604条记录;1997年后三个月的数据有46493条记录。有的读者可能会有疑问,为什么后三个月的数据占据了绝大多数?因为数据集的记录是从9月份开始的。

  1. Statistics for data in the folder 'data_with_partitions' :
  2. Summary:
  3. |colName| count|missing| sum| mean| variance|min| max|
  4. |-------|------|-------|--------|--------|-----------|---|----|
  5. |user_id|100000| 0|46248475|462.4848| 71083.249| 1| 943|
  6. |item_id|100000| 0|42553013|425.5301|109427.5525| 1|1682|
  7. | rating|100000| 0| 352986| 3.5299| 1.2671| 1| 5|
  8. | ts|100000| 0| NaN| NaN| NaN|NaN| NaN|
  9. Statistics for data of year=1997 :
  10. Summary:
  11. |colName|count|missing| sum| mean| variance|min| max|
  12. |-------|-----|-------|--------|--------|-----------|---|----|
  13. |user_id|52604| 0|24228648|460.5857| 73330.1483| 1| 943|
  14. |item_id|52604| 0|21680120|412.1382|106729.2902| 1|1682|
  15. | rating|52604| 0| 187718| 3.5685| 1.2213| 1| 5|
  16. | ts|52604| 0| NaN| NaN| NaN|NaN| NaN|
  17. Statistics for data of year 1997's last 3 months :
  18. Summary:
  19. |colName|count|missing| sum| mean| variance|min| max|
  20. |-------|-----|-------|--------|--------|-----------|---|----|
  21. |user_id|46493| 0|21567932|463.8963| 72816.402| 1| 939|
  22. |item_id|46493| 0|19225958|413.5237|106324.0118| 1|1682|
  23. | rating|46493| 0| 166044| 3.5714| 1.2245| 1| 5|
  24. | ts|46493| 0| NaN| NaN| NaN|NaN| NaN|

我们还可以使用更多的方式来选择分区,譬如:”day LIKE ‘3_’”表示第一位为字符“3”,第二位可以为任意的字符;”day LIKE ‘%3%’”表示字符串中要包含字符“3”。完整示例代码如下:

  1. AkSourceBatchOp()\
  2. .setFilePath(LOCAL_DIR + "data_with_partitions")\
  3. .setPartitions("day LIKE '3_'")\
  4. .lazyPrint(10, ">>> day LIKE '3_'")
  5. AkSourceBatchOp()\
  6. .setFilePath(LOCAL_DIR + "data_with_partitions")\
  7. .setPartitions("day LIKE '%3%'")\
  8. .print(10, ">>> day LIKE '%3%'")

运行结果如下,结果数据满足我们的预期。
image.png

�3.2.6.2 按分区读取流式数据

对于流式组件,参数Partitions的设置与批式组件相同。这里,我们只举一个简单的例子,使用AkSourceStreamOp组件,选择日期为1号和2号的数据,对应的表达式为:”day IN(‘01’, ‘02’)”。具体代码如下:

  1. AkSourceStreamOp()\
  2. .setFilePath(LOCAL_DIR + "data_with_partitions")\
  3. .setPartitions("day IN('01', '02')")\
  4. .sample(0.001)\
  5. .print()
  6. StreamOperator.execute()


运行结果如下,显示的数据都为1号或者2号的,也符合预期。
image.png