Java 类名:com.alibaba.alink.operator.stream.sql.WhereStreamOp
Python 类名:WhereStreamOp

功能介绍

对流式数据进行sql的WHERE操作。

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| clause | 运算语句 | 运算语句 | String | ✓ | | |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df = pd.DataFrame([
  5. ["a", 1, 1.1, 1.2],
  6. ["b", -2, 0.9, 1.0],
  7. ["c", 100, -0.01, 1.0],
  8. ["d", -99, 100.9, 0.1],
  9. ["a", 1, 1.1, 1.2],
  10. ["b", -2, 0.9, 1.0],
  11. ["c", 100, -0.01, 0.2],
  12. ["d", -99, 100.9, 0.3]
  13. ])
  14. source = StreamOperator.fromDataframe(df, schemaStr='col1 string, col2 int, col3 double, col4 double')
  15. source.link(WhereStreamOp().setClause("col1='a'")).print()
  16. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.stream.StreamOperator;
  3. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  4. import org.junit.Test;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. public class WhereStreamOpTest {
  8. @Test
  9. public void testWhereStreamOp() throws Exception {
  10. List <Row> inputRows = Arrays.asList(
  11. Row.of("a", 1, 1.1, 1.2),
  12. Row.of("b", -2, 0.9, 1.0),
  13. Row.of("c", 100, -0.01, 1.0),
  14. Row.of("d", -99, 100.9, 0.1),
  15. Row.of("a", 1, 1.1, 1.2),
  16. Row.of("b", -2, 0.9, 1.0),
  17. Row.of("c", 100, -0.01, 0.2),
  18. Row.of("d", -99, 100.9, 0.3)
  19. );
  20. StreamOperator <?> source = new MemSourceStreamOp(inputRows,
  21. "col1 string, col2 int, col3 double, col4 double");
  22. StreamOperator <?> output = source.link(new WhereStreamOp().setClause("col1='a'"));
  23. output.print();
  24. StreamOperator.execute();
  25. }
  26. }

运行结果

| col1 | col2 | col3 | col4 | | —- | —- | —- | —- |

| a | 1 | 1.1000 | 1.2000 |

| a | 1 | 1.1000 | 1.2000 |