1. package pulsar.connector_flink;
    2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
    6. import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
    7. import org.apache.pulsar.client.impl.schema.AvroSchema;
    8. import javax.xml.validation.Schema;
    9. import java.util.Properties;
    10. // Pulsar整合flink, 完成让flink从Pulsar中读取消息的操作
    11. public class FlinkFromPulsarSource {
    12. public static void main(String[] args) throws Exception {
    13. //1. 创建Flink的流式计算的核心环境类对象
    14. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    15. //2. 添加source数据源, 用于读取数据 : Pulsar
    16. Properties properties = new Properties();
    17. properties.setProperty("topic", "persistent://wyx_pilsar_t/wyx_pilsar_n/my_tt01");
    18. FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(
    19. "pulsar://192.168.72.129:6650,192.168.72.131:6650,192.168.72.132:6650",
    20. "http://192.168.72.129:8080,192.168.72.130:8080,192.168.72.131:8080",
    21. PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()),
    22. properties
    23. );
    24. pulsarSource.setStartFromLatest();
    25. DataStreamSource<String> streamSource = environment.addSource(pulsarSource);
    26. //3. 添加一些转换处理的操作, 对数据进行统计分析
    27. // streamSource.flatMap()
    28. //4. 添加 sink的组件, 将计算的结果进行输出操作
    29. streamSource.print();
    30. //5. 启动flink程序
    31. environment.execute("FinkFromPulsar");
    32. }
    33. }