https://mp.weixin.qq.com/s/2kbMDuEH70D4s5io6UOBCw

构建工具

Flink项目可以使用不同的构建工具进行构建。为了能够快速入门,Flink 为以下构建工具提供了项目模版:

  • Maven
  • Gradle

这些模版可以帮助你搭建项目结构并创建初始构建文件。

Maven

环境要求

唯一的要求是使用 Maven 3.0.4 (或更高版本)和安装 Java 8.x

创建项目

使用以下命令之一来 创建项目
使用Maven archetypes

  1. mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.9.0

运行quickstart脚本,然后运行示范

  1. curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0

windows安装netcat

https://www.codenong.com/cs106159249/
安装这个可以查看程序运行的结果:
https://eternallybored.org/misc/netcat/

NC命令使用
https://lddp.github.io/2018/05/10/%E5%B8%B8%E8%A7%81%E7%AB%AF%E5%8F%A3%E8%BF%9E%E6%8E%A5%E4%BB%A5%E5%8F%8Anc%E7
%9A%84%E4%BD%BF%E7%94%A8/
cmd中使用 nc -l -p flink端口号 ,类似socket连接,flink程序启动后,对应的flink端口处于监听状态,然后在该界面输入单词,那flink的单词统计程序就能处理了。
这个程序相当于一个 TaskManager,负责处理任务,可以导入服务端,被JobManager管理。然后 nc 程序相当于客户端,负责提交客户端的任务。
TaskManager 打成jar 导入 Job中

注意点1
首先nc -l 客户端程序先启动,然后再提交任务,Submit new job,这样才不会拒绝连接,

注意点2
上传文件的位置 E:\zbigdata
提交任务的参数配置: —hostname 192.168.161.7 —port 9999
image.png

注意点3:
https://www.1024sou.com/article/472011.html
https://vimsky.com/examples/detail/java-method-org.apache.flink.streaming.api.datastream.DataStream.assignTimestampsAndWatermarks.html
https://www.hnbian.cn/posts/dbaa157a.html

  1. Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker).
  2. Is the time characteristic set to 'ProcessingTime', or did you forget to call
  3. 'DataStream.assignTimestampsAndWatermarks(...)'?
  4. 该异常的处理方式:
  5. DataStream text = env.socketTextStream("10.192.78.17", 9000, "\n");
  6. // 首先将字符串数据解析成单词和次数(使用元组类型Tuple2<String, Integer>表示),
  7. // 第一个字段是单词,第二个字段是次数,次数初始值都设置成1
  8. DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  9. @Override
  10. public void flatMap(String in, Collector out) throws Exception {
  11. for (String word : in.split("\\s")) {
  12. out.collect(Tuple2.of(word, 1));
  13. }
  14. }
  15. }).keyBy((KeySelector<Tuple2<String, Integer>, String>) tuple2 -> tuple2.f0)
  16. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  17. .sum(1);
  18. 解决:将TumblingEventTimeWindows改为TumblingProcessingTimeWindows(具体还是看业务)

异常1

  1. Flink 测试报错 java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
  2. pom.xml文件中的
  3. <scope>provided</scope>
  4. 注释掉

flink 内存设置
flink-conf.yaml中修改下边的参数

查看日志
tail -f log/flink--taskexecutor-.out

NC执行
image.png