package pulsar.connector_flink;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;import org.apache.pulsar.client.impl.schema.AvroSchema;import javax.xml.validation.Schema;import java.util.Properties;// Pulsar整合flink, 完成让flink从Pulsar中读取消息的操作public class FlinkFromPulsarSource { public static void main(String[] args) throws Exception { //1. 创建Flink的流式计算的核心环境类对象 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //2. 添加source数据源, 用于读取数据 : Pulsar Properties properties = new Properties(); properties.setProperty("topic", "persistent://wyx_pilsar_t/wyx_pilsar_n/my_tt01"); FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>( "pulsar://192.168.72.129:6650,192.168.72.131:6650,192.168.72.132:6650", "http://192.168.72.129:8080,192.168.72.130:8080,192.168.72.131:8080", PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()), properties ); pulsarSource.setStartFromLatest(); DataStreamSource<String> streamSource = environment.addSource(pulsarSource); //3. 添加一些转换处理的操作, 对数据进行统计分析// streamSource.flatMap() //4. 添加 sink的组件, 将计算的结果进行输出操作 streamSource.print(); //5. 启动flink程序 environment.execute("FinkFromPulsar"); }}