Java 类名:com.alibaba.alink.operator.stream.source.TsvSourceStreamOp
Python 类名:TsvSourceStreamOp

功能介绍

按行读取以tab为分隔符的Tsv文件。

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| filePath | 文件路径 | 文件路径 | String | ✓ | | |

| schemaStr | Schema | Schema。格式为”colname coltype[, colname2, coltype2[, …]]”,例如”f0 string, f1 bigint, f2 double” | String | ✓ | | |

| ignoreFirstLine | 是否忽略第一行数据 | 是否忽略第一行数据 | Boolean | | | false |

| skipBlankLine | 是否忽略空行 | 是否忽略空行 | Boolean | | | true |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df = pd.DataFrame([
  5. ["0L", "1L", 0.6],
  6. ["2L", "2L", 0.8],
  7. ["2L", "4L", 0.6],
  8. ["3L", "1L", 0.6],
  9. ["3L", "2L", 0.3],
  10. ["3L", "4L", 0.4]
  11. ])
  12. source = StreamOperator.fromDataframe(df, schemaStr='uid string, iid string, label double')
  13. filepath = "/tmp/abc.tsv"
  14. tsvSink = TsvSinkStreamOp()\
  15. .setFilePath(filepath)\
  16. .setOverwriteSink(True)
  17. source.link(tsvSink)
  18. StreamOperator.execute()
  19. tsvSource = TsvSourceStreamOp().setFilePath(filepath).setSchemaStr("f string");
  20. tsvSource.print()
  21. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.stream.StreamOperator;
  3. import com.alibaba.alink.operator.stream.sink.TsvSinkStreamOp;
  4. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  5. import com.alibaba.alink.operator.stream.source.TsvSourceStreamOp;
  6. import org.junit.Test;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class TsvSourceStreamOpTest {
  10. @Test
  11. public void testTsvSourceStreamOp() throws Exception {
  12. List <Row> df = Arrays.asList(
  13. Row.of("0L", "1L", 0.6),
  14. Row.of("2L", "2L", 0.8),
  15. Row.of("2L", "4L", 0.6),
  16. Row.of("3L", "1L", 0.6),
  17. Row.of("3L", "2L", 0.3),
  18. Row.of("3L", "4L", 0.4)
  19. );
  20. StreamOperator <?> source = new MemSourceStreamOp(df, "uid string, iid string, label double");
  21. String filepath = "/tmp/abc.tsv";
  22. StreamOperator <?> tsvSink = new TsvSinkStreamOp()
  23. .setFilePath(filepath)
  24. .setOverwriteSink(true);
  25. source.link(tsvSink);
  26. StreamOperator.execute();
  27. StreamOperator <?> tsvSource = new TsvSourceStreamOp().setFilePath(filepath).setSchemaStr("f string");
  28. tsvSource.print();
  29. StreamOperator.execute();
  30. }
  31. }

运行结果

| f | | —- |

| 3L |

| 0L |

| 3L |

| 3L |

| 2L |

| 2L |