Java 类名:com.alibaba.alink.operator.stream.source.KafkaSourceStreamOp
Python 类名:KafkaSourceStreamOp

功能介绍

读Kafka版。Kafka是由Apache软件基金会开发的一个开源流处理平台。详情
请参阅:https://kafka.apache.org/

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
bootstrapServers bootstrapServers bootstrapServers String
groupId groupId groupId String
startupMode startupMode startupMode String “EARLIEST”, “GROUP_OFFSETS”, “LATEST”, “TIMESTAMP”
properties 用户自定义Kafka参数 用户自定义Kafka参数,形如: “prop1= val1, prop2 = val2” String null
startTime 起始时间 起始时间。默认从当前时刻开始读。 String null
topic topic名称 topic名称 String null
topicPattern topic pattern topic pattern String null

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. data = KafkaSourceStreamOp() \
  2. .setBootstrapServers("localhost:9092") \
  3. .setTopic("iris") \
  4. .setStartupMode("EARLIEST") \
  5. .setGroupId("alink_group")
  6. data.print()
  7. StreamOperator.execute()

Java 代码

  1. import com.alibaba.alink.operator.stream.StreamOperator;
  2. import com.alibaba.alink.operator.stream.source.KafkaSourceStreamOp;
  3. import org.junit.Test;
  4. public class KafkaSourceStreamOpTest {
  5. @Test
  6. public void testKafkaSourceStreamOp() throws Exception {
  7. StreamOperator <?> data = new KafkaSourceStreamOp()
  8. .setBootstrapServers("localhost:9092")
  9. .setTopic("iris")
  10. .setStartupMode("EARLIEST")
  11. .setGroupId("alink_group");
  12. data.print();
  13. StreamOperator.execute();
  14. }
  15. }