Java 类名:com.alibaba.alink.operator.stream.sink.Export2FileSinkStreamOp
Python 类名:Export2FileSinkStreamOp

功能介绍

通过本组件可以将流数据方便的转换为分段的批数据。
输出的数据,可以通过 AkSource BatchOp/StreamOp 方便的读取,从而可以完成数据的批和流之间的转换

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| filePath | 文件路径 | 文件路径 | String | ✓ | | |

| windowTime | 窗口大小 | 窗口大小 | String | ✓ | | |

| overwriteSink | 是否覆写已有数据 | 是否覆写已有数据 | Boolean | | | false |

| partitionsFormat | 分区格式化字符串 | 可以使用类似于 year=yyyy/month=MM/day=dd 的形式自定义分区格式 | String | | | null |

| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | | | null |

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. import time
  2. import datetime
  3. import numpy as np
  4. import pandas as pd
  5. filePath = "/tmp/export_2_file_sink/"
  6. data = pd.DataFrame([
  7. [0, datetime.datetime.fromisoformat("2021-11-01 00:00:00"), 100.0],
  8. [0, datetime.datetime.fromisoformat("2021-11-02 00:00:00"), 200.0],
  9. [0, datetime.datetime.fromisoformat("2021-11-03 00:00:00"), 300.0],
  10. [0, datetime.datetime.fromisoformat("2021-11-04 00:00:00"), 400.0],
  11. [0, datetime.datetime.fromisoformat("2021-11-06 00:00:00"), 500.0],
  12. [0, datetime.datetime.fromisoformat("2021-11-07 00:00:00"), 600.0],
  13. [0, datetime.datetime.fromisoformat("2021-11-08 00:00:00"), 700.0],
  14. [0, datetime.datetime.fromisoformat("2021-11-09 00:00:00"), 800.0],
  15. [0, datetime.datetime.fromisoformat("2021-11-10 00:00:00"), 900.0],
  16. [0, datetime.datetime.fromisoformat("2021-11-11 00:00:00"), 800.0],
  17. [0, datetime.datetime.fromisoformat("2021-11-12 00:00:00"), 700.0],
  18. [0, datetime.datetime.fromisoformat("2021-11-13 00:00:00"), 600.0],
  19. [0, datetime.datetime.fromisoformat("2021-11-14 00:00:00"), 500.0],
  20. [0, datetime.datetime.fromisoformat("2021-11-15 00:00:00"), 400.0],
  21. [0, datetime.datetime.fromisoformat("2021-11-16 00:00:00"), 300.0],
  22. [0, datetime.datetime.fromisoformat("2021-11-17 00:00:00"), 200.0],
  23. [0, datetime.datetime.fromisoformat("2021-11-18 00:00:00"), 100.0],
  24. [0, datetime.datetime.fromisoformat("2021-11-19 00:00:00"), 200.0],
  25. [0, datetime.datetime.fromisoformat("2021-11-20 00:00:00"), 300.0],
  26. [0, datetime.datetime.fromisoformat("2021-11-21 00:00:00"), 400.0],
  27. [0, datetime.datetime.fromisoformat("2021-11-22 00:00:00"), 500.0],
  28. [0, datetime.datetime.fromisoformat("2021-11-23 00:00:00"), 600.0],
  29. [0, datetime.datetime.fromisoformat("2021-11-24 00:00:00"), 700.0],
  30. [0, datetime.datetime.fromisoformat("2021-11-25 00:00:00"), 800.0],
  31. [0, datetime.datetime.fromisoformat("2021-11-26 00:00:00"), 900.0],
  32. [0, datetime.datetime.fromisoformat("2021-11-27 00:00:00"), 800.0],
  33. [0, datetime.datetime.fromisoformat("2021-11-28 00:00:00"), 700.0],
  34. [0, datetime.datetime.fromisoformat("2021-11-29 00:00:00"), 600.0],
  35. [0, datetime.datetime.fromisoformat("2021-11-30 00:00:00"), 500.0],
  36. [0, datetime.datetime.fromisoformat("2021-12-01 00:00:00"), 400.0],
  37. [0, datetime.datetime.fromisoformat("2021-12-02 00:00:00"), 300.0],
  38. [0, datetime.datetime.fromisoformat("2021-12-03 00:00:00"), 200.0]
  39. ])
  40. streamSource = StreamOperator.fromDataframe(
  41. data, schemaStr='id int, ts timestamp, val double')
  42. export2FileSinkStreamOp = Export2FileSinkStreamOp()\
  43. .setFilePath(filePath)\
  44. .setTimeCol("ts")\
  45. .setWindowTime(24 * 3600)\
  46. .setOverwriteSink(True)
  47. streamSource.link(export2FileSinkStreamOp)
  48. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.stream.StreamOperator;
  3. import com.alibaba.alink.operator.stream.sink.Export2FileSinkStreamOp;
  4. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  5. import org.junit.Test;
  6. import java.sql.Timestamp;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class Export2FileSinkStreamOpTest {
  10. @Test
  11. public void testExport2FileSink() throws Exception {
  12. String filePath = "/tmp/export_2_file_sink/";
  13. List <Row> data = Arrays.asList(
  14. Row.of(0, Timestamp.valueOf("2021-11-01 00:00:00"), 100.0),
  15. Row.of(0, Timestamp.valueOf("2021-11-02 00:00:00"), 200.0),
  16. Row.of(0, Timestamp.valueOf("2021-11-03 00:00:00"), 300.0),
  17. Row.of(0, Timestamp.valueOf("2021-11-04 00:00:00"), 400.0),
  18. Row.of(0, Timestamp.valueOf("2021-11-06 00:00:00"), 500.0),
  19. Row.of(0, Timestamp.valueOf("2021-11-07 00:00:00"), 600.0),
  20. Row.of(0, Timestamp.valueOf("2021-11-08 00:00:00"), 700.0),
  21. Row.of(0, Timestamp.valueOf("2021-11-09 00:00:00"), 800.0),
  22. Row.of(0, Timestamp.valueOf("2021-11-10 00:00:00"), 900.0),
  23. Row.of(0, Timestamp.valueOf("2021-11-11 00:00:00"), 800.0),
  24. Row.of(0, Timestamp.valueOf("2021-11-12 00:00:00"), 700.0),
  25. Row.of(0, Timestamp.valueOf("2021-11-13 00:00:00"), 600.0),
  26. Row.of(0, Timestamp.valueOf("2021-11-14 00:00:00"), 500.0),
  27. Row.of(0, Timestamp.valueOf("2021-11-15 00:00:00"), 400.0),
  28. Row.of(0, Timestamp.valueOf("2021-11-16 00:00:00"), 300.0),
  29. Row.of(0, Timestamp.valueOf("2021-11-17 00:00:00"), 200.0),
  30. Row.of(0, Timestamp.valueOf("2021-11-18 00:00:00"), 100.0),
  31. Row.of(0, Timestamp.valueOf("2021-11-19 00:00:00"), 200.0),
  32. Row.of(0, Timestamp.valueOf("2021-11-20 00:00:00"), 300.0),
  33. Row.of(0, Timestamp.valueOf("2021-11-21 00:00:00"), 400.0),
  34. Row.of(0, Timestamp.valueOf("2021-11-22 00:00:00"), 500.0),
  35. Row.of(0, Timestamp.valueOf("2021-11-23 00:00:00"), 600.0),
  36. Row.of(0, Timestamp.valueOf("2021-11-24 00:00:00"), 700.0),
  37. Row.of(0, Timestamp.valueOf("2021-11-25 00:00:00"), 800.0),
  38. Row.of(0, Timestamp.valueOf("2021-11-26 00:00:00"), 900.0),
  39. Row.of(0, Timestamp.valueOf("2021-11-27 00:00:00"), 800.0),
  40. Row.of(0, Timestamp.valueOf("2021-11-28 00:00:00"), 700.0),
  41. Row.of(0, Timestamp.valueOf("2021-11-29 00:00:00"), 600.0),
  42. Row.of(0, Timestamp.valueOf("2021-11-30 00:00:00"), 500.0),
  43. Row.of(0, Timestamp.valueOf("2021-12-01 00:00:00"), 400.0),
  44. Row.of(0, Timestamp.valueOf("2021-12-02 00:00:00"), 300.0),
  45. Row.of(0, Timestamp.valueOf("2021-12-03 00:00:00"), 200.0)
  46. );
  47. StreamOperator <?> streamSource = new MemSourceStreamOp(data, "id int, ts timestamp, val double");
  48. Export2FileSinkStreamOp export2FileSinkStreamOp = new Export2FileSinkStreamOp()
  49. .setFilePath(filePath)
  50. .setTimeCol("ts")
  51. .setWindowTime(24 * 3600)
  52. .setOverwriteSink(true);
  53. streamSource.link(export2FileSinkStreamOp);
  54. StreamOperator.execute();
  55. }
  56. }

运行结果

$ ls /tmp/export_2_file_sink
202111020000000 202111040000000 202111070000000 202111090000000 202111110000000 202111130000000 202111150000000 202111170000000 202111190000000 202111210000000 202111230000000 202111250000000 202111270000000 202111290000000 202112010000000 202112030000000
202111030000000 202111050000000 202111080000000 202111100000000 202111120000000 202111140000000 202111160000000 202111180000000 202111200000000 202111220000000 202111240000000 202111260000000 202111280000000 202111300000000 202112020000000 202112040000000