https://mp.weixin.qq.com/s/2kbMDuEH70D4s5io6UOBCw
构建工具
Flink项目可以使用不同的构建工具进行构建。为了能够快速入门,Flink 为以下构建工具提供了项目模版:
- Maven
- Gradle
Maven
环境要求
唯一的要求是使用 Maven 3.0.4 (或更高版本)和安装 Java 8.x。
创建项目
使用以下命令之一来 创建项目:
使用Maven archetypes
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
运行quickstart脚本,然后运行示范
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0
windows安装netcat
https://www.codenong.com/cs106159249/
安装这个可以查看程序运行的结果:
https://eternallybored.org/misc/netcat/
NC命令使用
https://lddp.github.io/2018/05/10/%E5%B8%B8%E8%A7%81%E7%AB%AF%E5%8F%A3%E8%BF%9E%E6%8E%A5%E4%BB%A5%E5%8F%8Anc%E7
%9A%84%E4%BD%BF%E7%94%A8/
cmd中使用 nc -l -p flink端口号 ,类似socket连接,flink程序启动后,对应的flink端口处于监听状态,然后在该界面输入单词,那flink的单词统计程序就能处理了。
这个程序相当于一个 TaskManager,负责处理任务,可以导入服务端,被JobManager管理。然后 nc 程序相当于客户端,负责提交客户端的任务。
TaskManager 打成jar 导入 Job中
注意点1:
首先nc -l 客户端程序先启动,然后再提交任务,Submit new job,这样才不会拒绝连接,
注意点2:
上传文件的位置 E:\zbigdata
提交任务的参数配置: —hostname 192.168.161.7 —port 9999
注意点3:
https://www.1024sou.com/article/472011.html
https://vimsky.com/examples/detail/java-method-org.apache.flink.streaming.api.datastream.DataStream.assignTimestampsAndWatermarks.html
https://www.hnbian.cn/posts/dbaa157a.html
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker).
Is the time characteristic set to 'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?
该异常的处理方式:
DataStream text = env.socketTextStream("10.192.78.17", 9000, "\n");
// 首先将字符串数据解析成单词和次数(使用元组类型Tuple2<String, Integer>表示),
// 第一个字段是单词,第二个字段是次数,次数初始值都设置成1
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String in, Collector out) throws Exception {
for (String word : in.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy((KeySelector<Tuple2<String, Integer>, String>) tuple2 -> tuple2.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1);
解决:将TumblingEventTimeWindows改为TumblingProcessingTimeWindows(具体还是看业务)
异常1:
Flink 测试报错 java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
pom.xml文件中的
<scope>provided</scope>
注释掉
flink 内存设置:
flink-conf.yaml中修改下边的参数
查看日志:
tail -f log/flink--taskexecutor-.out
NC执行: