Java 类名:com.alibaba.alink.operator.stream.sink.TsvSinkStreamOp
Python 类名:TsvSinkStreamOp

功能介绍

写Tsv文件,Tsv文件是以tab为分隔符

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| filePath | 文件路径 | 文件路径 | String | ✓ | | |

| numFiles | 文件数目 | 文件数目 | Integer | | | 1 |

| overwriteSink | 是否覆写已有数据 | 是否覆写已有数据 | Boolean | | | false |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df = pd.DataFrame([
  5. ["0L", "1L", 0.6],
  6. ["2L", "2L", 0.8],
  7. ["2L", "4L", 0.6],
  8. ["3L", "1L", 0.6],
  9. ["3L", "2L", 0.3],
  10. ["3L", "4L", 0.4]
  11. ])
  12. source = StreamOperator.fromDataframe(df, schemaStr='uid string, iid string, label double')
  13. filepath = "/tmp/abc.tsv"
  14. tsvSink = TsvSinkStreamOp()\
  15. .setFilePath(filepath)\
  16. .setOverwriteSink(True)
  17. source.link(tsvSink)
  18. StreamOperator.execute()
  19. tsvSource = TsvSourceStreamOp().setFilePath(filepath).setSchemaStr("f string");
  20. tsvSource.print()
  21. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.stream.StreamOperator;
  3. import com.alibaba.alink.operator.stream.sink.TsvSinkStreamOp;
  4. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  5. import com.alibaba.alink.operator.stream.source.TsvSourceStreamOp;
  6. import org.junit.Test;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class TsvSinkStreamOpTest {
  10. @Test
  11. public void testTsvSinkStreamOp() throws Exception {
  12. List <Row> df = Arrays.asList(
  13. Row.of("0L", "1L", 0.6),
  14. Row.of("2L", "2L", 0.8),
  15. Row.of("2L", "4L", 0.6),
  16. Row.of("3L", "1L", 0.6),
  17. Row.of("3L", "2L", 0.3),
  18. Row.of("3L", "4L", 0.4)
  19. );
  20. StreamOperator <?> source = new MemSourceStreamOp(df, "uid string, iid string, label double");
  21. String filepath = "/tmp/abc.tsv";
  22. StreamOperator <?> tsvSink = new TsvSinkStreamOp()
  23. .setFilePath(filepath)
  24. .setOverwriteSink(true);
  25. source.link(tsvSink);
  26. StreamOperator.execute();
  27. StreamOperator <?> tsvSource = new TsvSourceStreamOp().setFilePath(filepath).setSchemaStr("f string");
  28. tsvSource.print();
  29. StreamOperator.execute();
  30. }
  31. }