一、Java版flink-wordount离线计算版

1、maven构建项目搭建开发环境

  • maven构建scala项目
  • 因为要同时编写scala和Java,所以需要加入依赖,并且在main目录下创建Java文件夹

    1. <!-- 因为往往是scala和java在一起混合开发,故需要设置多个源文件目录,故需要maven新插件build-helper-maven-plugin来支持设置多个源文件夹,也可以设置多个资源路径 -->
    2. <plugin>
    3. <groupId>org.codehaus.mojo</groupId>
    4. <artifactId>build-helper-maven-plugin</artifactId>
    5. <version>3.0.0</version>
    6. <executions>
    7. <execution>
    8. <id>add-source</id>
    9. <phase>generate-sources</phase>
    10. <goals>
    11. <goal>add-source</goal>
    12. </goals>
    13. <configuration>
    14. <sources>
    15. <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
    16. <source>${basedir}/src/main/java</source>
    17. <source>${basedir}/src/main/scala</source>
    18. </sources>
    19. </configuration>
    20. </execution>
    21. <execution>
    22. <id>add-test-source</id>
    23. <phase>generate-test-sources</phase>
    24. <goals>
    25. <goal>add-test-source</goal>
    26. </goals>
    27. <configuration>
    28. <sources>
    29. <source>${basedir}/src/test/scala</source>
    30. <source>${basedir}/src/test/java</source>
    31. </sources>
    32. </configuration>
    33. </execution>
    34. </executions>
    35. </plugin>
  • 然后加入flink依赖

    1. <!-- 一般环境配置 -->
    2. <properties>
    3. <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
    4. <maven.compiler.source>1.8</maven.compiler.source>
    5. <maven.compiler.target>1.8</maven.compiler.target>
    6. <encoding>UTF-8</encoding>
    7. <scala.version>2.11.11</scala.version>
    8. <scala.compile.at.version>2.11</scala.compile.at.version>
    9. <flink.version>1.13.1</flink.version>
    10. <jdk.version>1.8</jdk.version>
    11. </properties>
    12. <!-- flink包依赖配置-start -->
    13. <dependency>
    14. <groupId>org.apache.flink</groupId>
    15. <artifactId>flink-java</artifactId>
    16. <version>${flink.version}</version>
    17. <!-- <scope>provided</scope> -->
    18. </dependency>
    19. <dependency>
    20. <groupId>org.apache.flink</groupId>
    21. <artifactId>flink-clients_${scala.compile.at.version}</artifactId>
    22. <version>${flink.version}</version>
    23. </dependency>
    24. <!-- flink包依赖配置-end -->

    2、Java代码的编写

    ```java public class FlinkWordCount4DataSet { public static void main(String[] args) throws Exception { //创建环境变量 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //定义文件路径 String filePath = “”; if (args == null||args.length==0){

    1. filePath = "C:\\学习\\技术学习\\data.txt";

    }else {

    1. filePath = args[0];

    } //获取输入文件对应的DataSet对象 DataSet inputLineDataSet = env.readTextFile(filePath); //对数据集进行多个算子处理,转换为(word,1)二元组 DataSet> resultSet = inputLineDataSet

    1. .flatMap(
    2. new FlatMapFunction<String, Tuple2<String, Integer>>() {
    3. @Override
    4. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
    5. String[] wordArray = s.split(" ");
    6. for (String word:wordArray){
    7. collector.collect(new Tuple2<String, Integer>(
    8. word, 1));
    9. }
    10. }
    11. }
    12. ).groupBy(0).sum(1);

    //打印 resultSet.print(); } }

  1. <a name="tRflf"></a>
  2. ## 3、项目打包与部署
  3. - 需要加入对应的build打包配置,由于flink复杂的打包依赖与之前的有一些差距。
  4. - 去掉之前maven-assembly-plugin打包配置插件,当其配置复杂依赖、多配置文件时容易出现版本依赖处理异常。
  5. - 加入解决配置文件、版本不一致冲突的maven-shade-plugin打包配置插件,以后所有的all-in-one打包,均可以优先使用maven-shade-plugin配置打包。
  6. - 需要加入的build如下。
  7. ```xml
  8. <!-- 打包配置 -->
  9. <build>
  10. <plugins>
  11. <!-- 因为往往是scala和java在一起混合开发,故需要设置多个源文件目录,故需要maven新插件build-helper-maven-plugin来支持设置多个源文件夹,也可以设置多个资源路径 -->
  12. <plugin>
  13. <groupId>org.codehaus.mojo</groupId>
  14. <artifactId>build-helper-maven-plugin</artifactId>
  15. <version>3.0.0</version>
  16. <executions>
  17. <execution>
  18. <id>add-source</id>
  19. <phase>generate-sources</phase>
  20. <goals>
  21. <goal>add-source</goal>
  22. </goals>
  23. <configuration>
  24. <sources>
  25. <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
  26. <source>${basedir}/src/main/java</source>
  27. <source>${basedir}/src/main/scala</source>
  28. </sources>
  29. </configuration>
  30. </execution>
  31. </executions>
  32. </plugin>
  33. <plugin>
  34. <artifactId>maven-compiler-plugin</artifactId>
  35. <version>2.3.2</version>
  36. <configuration>
  37. <source>${jdk.version}</source>
  38. <target>${jdk.version}</target>
  39. <encoding>${encoding}</encoding>
  40. </configuration>
  41. </plugin>
  42. <!--打all-in-one jar包 -->
  43. <plugin>
  44. <groupId>org.apache.maven.plugins</groupId>
  45. <artifactId>maven-shade-plugin</artifactId>
  46. <version>2.3</version>
  47. <executions>
  48. <execution>
  49. <phase>package</phase>
  50. <goals>
  51. <goal>shade</goal>
  52. </goals>
  53. <configuration>
  54. <transformers>
  55. <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  56. <mainClass>flink.KafkaDemo1</mainClass> </transformer> -->
  57. <transformer
  58. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  59. <resource>reference.conf</resource>
  60. </transformer>
  61. </transformers>
  62. </configuration>
  63. </execution>
  64. </executions>
  65. </plugin>
  66. </plugins>
  67. </build>
  • 然后执行maven打包,得到jar包。
  • 上传到服务器上。
  • 有三种方法

    4、执行的三种方式

    1、第一种方式-传统的yarn jar方式(java -cp)

    • 优点:
      • 简单、易操作。
    • 缺点:
      • 默认仅支持local模式,改成分布式模式很麻烦。
      • 在打包时需要去掉provided。最终形成的jar包比较大。
    • 命令格式:

      • jarn jar jar包名 类名 args

        2、第二种执行方式-flink建议的执行方式

    • 优点:

      • 支持flink的所有方式,比较灵活。
      • 不需要再打入那么多的包。
    • 缺点:
      • 较第一种比较复杂,需要下载flink发布包,解压缩即可。
    • 注意事项:
      • 使hadoop环境变量生效。
        • 方式一
          • 将hadoop的环境变量设置到linux profile中
            • cat 、etc/profile
            • 图片.png
        • 方式二
          • 每次执行的代码之前先执行erport命令
          • 图片.png

            1、flink的三种运行模式实践

  • yarn application模式

    • 操作步骤
      • 首先进入安装flink的home路径
      • 执行flink任务的代码即可
        • run-application:即运行作业类型。专指application类型。
        • -t:指定运行模式
        • -c:指入口主类
        • app.jar:上传的jar包
        • 参数运行类的参数
        • 示例 ``xml [miaohongkai@cluster0 flink-1.13.1]$ export HADOOP_CLASSPATH=hadoop classpath` [miaohongkai@cluster0 flink-1.13.1]$ ./bin/flink run-application -t yarn-application -c com.tledu.FlinkWordCount4DataSet /home/job018/miaohongkai/flink/Flink-1.0-SNAPSHOT.jar hdfs:///user/miaohongkai/data.txt
  1. - 运行结果
  2. - 进入yarn中,找到运行日志
  3. - ![图片.png](https://cdn.nlark.com/yuque/0/2021/png/22215002/1638777435386-7da0e49e-6102-4a23-bc66-f76421289248.png#clientId=u579be0f1-4736-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=720&id=u492ebc41&margin=%5Bobject%20Object%5D&name=%E5%9B%BE%E7%89%87.png&originHeight=1439&originWidth=1405&originalType=binary&ratio=1&rotation=0&showTitle=false&size=80700&status=done&style=none&taskId=u591ac617-4f51-4710-8a91-affa41169db&title=&width=702.5)
  4. - yarn per-job运行模式
  5. - 操作步骤
  6. - 首先进入安装路径
  7. - 执行flink提交代码
  8. - run:即作业运行模式。包括sessionper-job
  9. - -t:指定运行模式
  10. - -c:指定入口主类
  11. - APP.jar:上传的jar
  12. - 参数
  13. - 需要先设置不需要进行classloader leaked check,在配置文件路径设置./conf/flink-conf.yaml,有则修改,无则新加即可
  14. #注意yaml参数文件修改,请在value前加上一个空格<br />classloader.check-leaked-classloader: false
  15. - 然后运行
  16. ```xml
  17. ./bin/flink run -t yarn-per-job -c com.tledu.FlinkWordCount4DataSet
  18. ../Flink-1.0-SNAPSHOT.jar
  19. hdfs:///user/miaohongkai/data.txt
  1. - 运行结果
  2. - ![图片.png](https://cdn.nlark.com/yuque/0/2021/png/22215002/1638779470164-08d58e4d-ba9b-4ac5-b3b8-54084dc44c02.png#clientId=u579be0f1-4736-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=421&id=u32f0bf9c&margin=%5Bobject%20Object%5D&name=%E5%9B%BE%E7%89%87.png&originHeight=841&originWidth=947&originalType=binary&ratio=1&rotation=0&showTitle=false&size=20765&status=done&style=none&taskId=u3abb1413-f437-4417-940d-321d5fc288a&title=&width=473.5)
  • yarn session运行方式
    • 操作步骤
      • 首先进入flinkhome目录
      • 附加模式(默认模式)
        • 特点
          • yarn-session.sh客户端将 Flink 集群提交给 YARN,但客户端保持运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户端被终止,它也会通知集群关闭。
        • 工作流程
          • 首先在yarn上启动flink session,并得到session task任务的yarn app-id。
          • ./bin/yarn-session.sh
          • 运行截图(一直在等待中,并不退出)

图片.png

  1. - 提交作业到session
  2. - #第1种提交: 多加入-t yarn-session参数,此时必须指定app-id参数,即提前启动的session作业任务id

./bin/flink run -t yarn-session -Dyarn.application.id=application_1627998129686_0475 -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt

  1. - #第2种提交: 不加入-t yarn-session参数,则不需要手动指定app-id,其是自行寻找提前启动的session作业任务id

./bin/flink run -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt

  1. - 注意:也需要加export HADOOP_CLASSPATH=`hadoop classpath`
  2. - 运行截图
  3. - ![图片.png](https://cdn.nlark.com/yuque/0/2021/png/22215002/1638782380060-1304b793-0904-4eed-b200-d469c83a7301.png#clientId=ue01e97df-fb52-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=433&id=u2aa70eec&margin=%5Bobject%20Object%5D&name=%E5%9B%BE%E7%89%87.png&originHeight=865&originWidth=1052&originalType=binary&ratio=1&rotation=0&showTitle=false&size=28109&status=done&style=none&taskId=u668646aa-6342-45fc-aa30-def68e6ab45&title=&width=526)
  4. - 分离模式-detached模式
  5. - 特点
  6. - yarn-session.sh客户端将Flink集群提交给YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。
  7. - 流程
  8. - 首先在yarn上提前启动flink session会话任务,并得到session task任务的yarn app-id
  9. - ./bin/yarn-session.sh --detached
  10. - 提交任务(与默认模式一样)
  11. - #第1种提交: 多加入-t yarn-session参数,此时必须指定app-id参数,即提前启动的session作业任务id

./bin/flink run -t yarn-session -Dyarn.application.id=application_1627998129686_0475 -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt

  1. - #第2种提交: 不加入-t yarn-session参数,则不需要手动指定app-id,其是自行寻找提前启动的session作业任务id

./bin/flink run -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt

  1. - 注意:也需要加export HADOOP_CLASSPATH=`hadoop classpath`

二、Java版flink-wordcount实时计算版

1、加入依赖

  1. <!-- flink包依赖配置-start -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>${flink.version}</version>
  6. <!-- <scope>provided</scope> -->
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-clients_${scala.version}</artifactId>
  11. <version>${flink.version}</version>
  12. <!-- <scope>provided</scope> -->
  13. </dependency>
  14. <!-- flink包依赖配置-end -->

2、代码实现

  1. public class FlinkWordCount4DataStream {
  2. public static void main(String[] args) throws Exception {
  3. //创建上下文环境变量
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. //定义主机和端口号
  7. String host = "localhost";
  8. int port = 9999;
  9. //获取输入的socket输入实时数据流
  10. DataStream<String> inputLineDataStream = env.socketTextStream(host,port);
  11. //进行算子操作
  12. DataStream<Tuple2<String, Integer>> resultStream = inputLineDataStream
  13. .flatMap(
  14. new FlatMapFunction<String, Tuple2<String, Integer>>() {
  15. @Override
  16. public void flatMap(String line,
  17. Collector<Tuple2<String, Integer>> out)
  18. throws Exception {
  19. String[] wordArray = line.split("\\s");
  20. for (String word : wordArray) {
  21. out.collect(new Tuple2<String, Integer>(
  22. word, 1));
  23. }
  24. }
  25. }).keyBy(0)
  26. .sum(1);
  27. //打印
  28. resultStream.print();
  29. //正式启动实时流处理引擎
  30. env.execute();
  31. }
  32. }
  • 启动nc 输入 nc -lp 9999
  • 运行Java程序
  • 成功

    三、Scala版flink-wordcount离线计算版

    1、环境搭建

    1. <!-- flink包依赖配置-start -->
    2. <!-- java开发flink依赖-start -->
    3. <dependency>
    4. <groupId>org.apache.flink</groupId>
    5. <artifactId>flink-java</artifactId>
    6. <version>${flink.version}</version>
    7. <scope>provided</scope>
    8. </dependency>
    9. <dependency>
    10. <groupId>org.apache.flink</groupId>
    11. <artifactId>flink-clients_${scala.compile.version}</artifactId>
    12. <version>${flink.version}</version>
    13. <scope>provided</scope>
    14. </dependency>
    15. <!-- java开发flink依赖-end -->
    16. <!-- scala开发flink依赖-start -->
    17. <dependency>
    18. <groupId>org.apache.flink</groupId>
    19. <artifactId>flink-scala_${scala.compile.version}</artifactId>
    20. <version>${flink.version}</version>
    21. <scope>provided</scope>
    22. </dependency>
    23. <dependency>
    24. <groupId>org.apache.flink</groupId>
    25. <artifactId>flink-streaming-scala_${scala.compile.version}</artifactId>
    26. <version>${flink.version}</version>
    27. <scope>provided</scope>
    28. </dependency>
    29. <!-- scala开发flink依赖-end -->
    30. <!-- flink包依赖配置-end -->

    2、代码编写

    1. object FlinkWordCount4DataSet4Scala {
    2. def main(args: Array[String]): Unit = {
    3. //获取上下文环境
    4. var env = ExecutionEnvironment.getExecutionEnvironment
    5. //加载字符串(测试用)
    6. //val source = env.fromElements("中国 抗美 援朝 战争 很伟大", "抗美 需要 中国")
    7. //定义从本地文件系统当中文件路径
    8. var filePath = ""
    9. if(args == null||args.length==0){
    10. filePath = "C:\\学习\\技术学习\\data.txt"
    11. }else{
    12. filePath = args(0)
    13. }
    14. val source = env.readTextFile(filePath)
    15. //进行transformaion操作
    16. val ds = source.flatMap(s => s.split(" ")).map(t => (t,1)).groupBy(0).sum(1)
    17. //输出
    18. ds.print()
    19. // 由于是Batch操作,当DataSet调用print方法时,源码内部已经调用Excute方法,所以此处不再调用
    20. //如果调用反而会出现上下文不匹配的执行错误
    21. //env.execute("Flink Batch Word Count By Scala")
    22. }
    23. }

    四、Scala版flink-wordcount实时计算版

    1、构建maven环境

  • scala离线版的开发已将实时依赖加入,不用再做增加或是改变

    2、代码编写

    ```scala object FlinkWordCount4DataStream4Scala { def main(args: Array[String]): Unit = { //获取上下文环境 var env = StreamExecutionEnvironment.getExecutionEnvironment //加载数据源 var source = env.socketTextStream(“localhost”,9999,’ ‘) //进行transformation var ds = source.flatMap(x => x.split(‘ ‘)).map(t => (t,1)).keyBy(0).sum(1) //输出 ds.print() //执行操作 env.execute(“FlinkWordCount4DataStream4Scala”) }

} ```

3、使用nc测试

图片.png