基础
引入依赖包:
<!-- version 和 Flink的版本一致即可 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.1</version>
</dependency>
配置Sink:
sensor.addSink(new FlinkKafkaProducer<String>(
"192.168.127.128:9092", "sensor2", new SimpleStringSchema()));
完整测试
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment
= StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.127.128:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset",
"latest");
// 监听sensor Topic,注意生产和消费的主题一定要分开,不然就会造成循环!
SingleOutputStreamOperator<String> sensor = streamExecutionEnvironment.addSource(
new FlinkKafkaConsumer<String>("sensor",
new SimpleStringSchema(),
properties)
).map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if(!value.isEmpty()){
String[] data = value.split(",");
return data[0] +">" + data[1];
}
return "";
}
});
// 发送给sensor2 Topic
sensor.addSink(new FlinkKafkaProducer<String>(
"192.168.127.128:9092", "sensor2", new SimpleStringSchema()));
sensor.print();
streamExecutionEnvironment.execute();
}
输出结果: