基础

引入依赖包:

  1. <!-- version 和 Flink的版本一致即可 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka_2.12</artifactId>
  5. <version>1.11.1</version>
  6. </dependency>

配置Sink:

sensor.addSink(new FlinkKafkaProducer<String>(
        "192.168.127.128:9092", "sensor2", new SimpleStringSchema()));

完整测试

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamExecutionEnvironment 
        = StreamExecutionEnvironment.getExecutionEnvironment();
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.127.128:9092");
    properties.setProperty("group.id", "consumer-group");
    properties.setProperty("key.deserializer",
                           "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer",
                           "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("auto.offset.reset", 
                           "latest");
    // 监听sensor Topic,注意生产和消费的主题一定要分开,不然就会造成循环!
    SingleOutputStreamOperator<String> sensor = streamExecutionEnvironment.addSource(
            new FlinkKafkaConsumer<String>("sensor",
                    new SimpleStringSchema(),
                    properties)
    ).map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws Exception {
            if(!value.isEmpty()){
                String[] data = value.split(",");
                return data[0] +">" + data[1];
            }
            return "";
        }
    });
    // 发送给sensor2 Topic
    sensor.addSink(new FlinkKafkaProducer<String>(
            "192.168.127.128:9092", "sensor2", new SimpleStringSchema()));
    sensor.print();
    streamExecutionEnvironment.execute();
}

输出结果:
image.png