package pulsar.connector_flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import javax.xml.validation.Schema;
import java.util.Properties;
// Pulsar整合flink, 完成让flink从Pulsar中读取消息的操作
public class FlinkFromPulsarSource {
public static void main(String[] args) throws Exception {
//1. 创建Flink的流式计算的核心环境类对象
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 添加source数据源, 用于读取数据 : Pulsar
Properties properties = new Properties();
properties.setProperty("topic", "persistent://wyx_pilsar_t/wyx_pilsar_n/my_tt01");
FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(
"pulsar://192.168.72.129:6650,192.168.72.131:6650,192.168.72.132:6650",
"http://192.168.72.129:8080,192.168.72.130:8080,192.168.72.131:8080",
PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()),
properties
);
pulsarSource.setStartFromLatest();
DataStreamSource<String> streamSource = environment.addSource(pulsarSource);
//3. 添加一些转换处理的操作, 对数据进行统计分析
// streamSource.flatMap()
//4. 添加 sink的组件, 将计算的结果进行输出操作
streamSource.print();
//5. 启动flink程序
environment.execute("FinkFromPulsar");
}
}