使用spring boot整合flink可以快速的构建起整个应用,将关注点重点放在业务逻辑的实现上。在整合的过程中遇到许多问题,最大的问题是flink流无法访问spring容器中的类,从而导致空指针异常,解决思路是在流中进行spring bean的初始化以获得ApplicationContext,进而使用其getBean方法获取类实例。
    软件版本:Spring Boot 2.1.6+Flink1.6.1+JDK1.8
    程序主体:

    1. @SpringBootApplication
    2. public class HadesTmsApplication implements CommandLineRunner {
    3. public static void main(String[] args) {
    4. SpringApplication application = new SpringApplication(HadesTmsApplication.class);
    5. application.setBannerMode(Banner.Mode.OFF);
    6. application.run(args);
    7. }
    8. @Override
    9. public void run(String... args) {
    10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    11. FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010<>("topic-name"), new SimpleStringSchema(), getProperties());
    12. DataStream<String> dataStream = env.addSource(kafkaConsumer);
    13. // 此处省略处理逻辑
    14. dataStream.addSink(new MySink());
    15. }
    16. private Properties getProperties() {
    17. Properties properties = new Properties();
    18. properties.setProperty("bootstrap.servers", bootstrap_servers);
    19. properties.setProperty("zookeeper.connect", zookeeper_connect);
    20. properties.setProperty("group.id", group_id);
    21. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    22. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    23. return properties;
    24. }
    25. }

    说明一下:因为是非web项目,所以实现CommandLineRunner接口,重写run方法。在里面编写流处理逻辑。
    如果在MySink中需要使用spring容器中的类,而MySink是一个普通的类,那么是无法访问到的。会引发空指针异常。可能有人想到了ApplicationContextAware这个接口,实现这个接口获取ApplicationContext,也即是:

    1. @Component
    2. public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
    3. private static final long serialVersionUID = -6454872090519042646L;
    4. private static ApplicationContext applicationContext = null;
    5. @Override
    6. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    7. if (ApplicationContextUtil.applicationContext == null) {
    8. ApplicationContextUtil.applicationContext = applicationContext;
    9. }
    10. }
    11. public static ApplicationContext getApplicationContext() {
    12. return applicationContext;
    13. }
    14. //通过name获取 Bean.
    15. public static Object getBean(String name) {
    16. return getApplicationContext().getBean(name);
    17. }
    18. //通过class获取Bean.
    19. public static <T> T getBean(Class<T> clazz) {
    20. return getApplicationContext().getBean(clazz);
    21. }
    22. //通过name,以及Clazz返回指定的Bean
    23. public static <T> T getBean(String name, Class<T> clazz) {
    24. return getApplicationContext().getBean(name, clazz);
    25. }
    26. }

    这种做法实际上在flink流处理中也是不可行的,在我之前的flink文章中 Flink读写系列之-读mysql并写入mysql 其中读和写阶段有一个open方法,这个方法专门用于进行初始化的,那么我们可以在这里进行spring bean的初始化。那么MySink改造后即为:

    1. @EnableAutoConfiguration
    2. @MapperScan(basePackages = {"com.xxx.bigdata.xxx.mapper"})
    3. public class SimpleSink extends RichSinkFunction<String> {
    4. TeacherInfoMapper teacherInfoMapper;
    5. @Override
    6. public void open(Configuration parameters) throws Exception {
    7. super.open(parameters);
    8. SpringApplication application = new SpringApplication(SimpleSink.class);
    9. application.setBannerMode(Banner.Mode.OFF);
    10. ApplicationContext context = application.run(new String[]{});
    11. teacherInfoMapper = context.getBean(TeacherInfoMapper.class);
    12. }
    13. @Override
    14. public void close() throws Exception {
    15. super.close();
    16. }
    17. @Override
    18. public void invoke(String value, Context context) throws Exception {
    19. List<TeacherInfo> teacherInfoList = teacherInfoMapper.selectByPage(0, 100);
    20. teacherInfoList.stream().forEach(teacherInfo -> System.out.println("teacherinfo:" + teacherInfo.getTeacherId() + "," + teacherInfo.getTimeBit() + "," + teacherInfo.getWeek()));
    21. }
    22. }

    在invoke中就可以访问spring容器中的Mapper方法了。
    pom如下:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0</modelVersion>
    5. <parent>
    6. <groupId>org.springframework.boot</groupId>
    7. <artifactId>spring-boot-starter-parent</artifactId>
    8. <version>2.1.6.RELEASE</version>
    9. <relativePath/> <!-- lookup parent from repository -->
    10. </parent>
    11. <groupId>com.xxx.bigdata</groupId>
    12. <artifactId>flink-project</artifactId>
    13. <version>1.0.0</version>
    14. <name>flink-project</name>
    15. <packaging>jar</packaging>
    16. <description>My project for Spring Boot</description>
    17. <properties>
    18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    19. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    20. <java.version>1.8</java.version>
    21. <flink.version>1.6.1</flink.version>
    22. <skipTests>true</skipTests>
    23. <maven.compiler.source>1.8</maven.compiler.source>
    24. <maven.compiler.target>1.8</maven.compiler.target>
    25. </properties>
    26. <dependencies>
    27. <dependency>
    28. <groupId>org.springframework.boot</groupId>
    29. <artifactId>spring-boot-starter</artifactId>
    30. <exclusions>
    31. <exclusion>
    32. <groupId>ch.qos.logback</groupId>
    33. <artifactId>logback-classic</artifactId>
    34. </exclusion>
    35. </exclusions>
    36. </dependency>
    37. <dependency>
    38. <groupId>org.apache.flink</groupId>
    39. <artifactId>flink-java</artifactId>
    40. <version>${flink.version}</version>
    41. </dependency>
    42. <dependency>
    43. <groupId>org.apache.flink</groupId>
    44. <artifactId>flink-streaming-java_2.11</artifactId>
    45. <version>${flink.version}</version>
    46. </dependency>
    47. <dependency>
    48. <groupId>org.apache.flink</groupId>
    49. <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    50. <version>${flink.version}</version>
    51. </dependency>
    52. <dependency>
    53. <groupId>com.cloudera</groupId>
    54. <artifactId>ImpalaJDBC41</artifactId>
    55. <version>2.6.4</version>
    56. </dependency>
    57. <dependency>
    58. <groupId>com.zaxxer</groupId>
    59. <artifactId>HikariCP</artifactId>
    60. <version>3.2.0</version>
    61. </dependency>
    62. <dependency>
    63. <groupId>org.mybatis.spring.boot</groupId>
    64. <artifactId>mybatis-spring-boot-starter</artifactId>
    65. <version>1.3.1</version>
    66. </dependency>
    67. <dependency>
    68. <groupId>com.alibaba</groupId>
    69. <artifactId>fastjson</artifactId>
    70. <version>1.2.47</version>
    71. </dependency>
    72. <dependency>
    73. <groupId>org.projectlombok</groupId>
    74. <artifactId>lombok</artifactId>
    75. <optional>true</optional>
    76. </dependency>
    77. <dependency>
    78. <groupId>org.springframework.boot</groupId>
    79. <artifactId>spring-boot-starter-test</artifactId>
    80. <scope>test</scope>
    81. </dependency>
    82. </dependencies>
    83. <build>
    84. <sourceDirectory>src/main/java</sourceDirectory>
    85. <resources>
    86. <resource>
    87. <directory>src/main/resources</directory>
    88. <filtering>true</filtering>
    89. <includes>
    90. <include>application.properties</include>
    91. <include>application-${package.environment}.properties</include>
    92. </includes>
    93. </resource>
    94. </resources>
    95. <plugins>
    96. <plugin>
    97. <groupId>org.springframework.boot</groupId>
    98. <artifactId>spring-boot-maven-plugin</artifactId>
    99. <configuration>
    100. <skip>true</skip>
    101. <mainClass>com.miaoke.bigdata.tms.HadesTmsApplication</mainClass>
    102. </configuration>
    103. <executions>
    104. <execution>
    105. <goals>
    106. <goal>repackage</goal>
    107. </goals>
    108. </execution>
    109. </executions>
    110. </plugin>
    111. <!--mybatis plugin to generate mapping file and class-->
    112. <plugin>
    113. <groupId>org.mybatis.generator</groupId>
    114. <artifactId>mybatis-generator-maven-plugin</artifactId>
    115. <version>1.3.5</version>
    116. <configuration>
    117. <configurationFile>${basedir}/src/main/resources/generatorConfig.xml</configurationFile>
    118. <overwrite>true</overwrite>
    119. <verbose>true</verbose>
    120. </configuration>
    121. <dependencies>
    122. <dependency>
    123. <groupId>com.cloudera</groupId>
    124. <artifactId>ImpalaJDBC41</artifactId>
    125. <version>2.6.4</version>
    126. </dependency>
    127. </dependencies>
    128. </plugin>
    129. </plugins>
    130. </build>
    131. <profiles>
    132. <!--开发环境-->
    133. <profile>
    134. <id>dev</id>
    135. <properties>
    136. <package.environment>dev</package.environment>
    137. </properties>
    138. <!--默认环境-->
    139. <activation>
    140. <activeByDefault>true</activeByDefault>
    141. </activation>
    142. </profile>
    143. <!--预发布环境-->
    144. <profile>
    145. <id>pre</id>
    146. <properties>
    147. <package.environment>pre</package.environment>
    148. </properties>
    149. </profile>
    150. <!--生产环境-->
    151. <profile>
    152. <id>pro</id>
    153. <properties>
    154. <package.environment>pro</package.environment>
    155. </properties>
    156. </profile>
    157. </profiles>
    158. </project>

    项目打包使用了默认的spring boot插件,配置了skip为true,如果不配置此项,打包后会多一个BOOT-INF目录,运行时会引起ClassNotFoundException等各种异常,比如KafkaStreming问题,甚至需要反转flink的类加载机制,由child-first变为parent-first(修改flink配置文件)等等。
    遇到的问题:
    1. java.lang.NoSuchMethodError: com.google.gson.GsonBuilder.setLenient()Lcom/google/gson/GsonBuilder
    【20200215】SpringBoot   Flink - 图1 GsonBuilder类来自gson-xxx.jar包,而我在自己的项目中执行mvn dependency:tree并没有发现依赖这个包。莫非在flink运行时会使用自己lib库下的gson包,转而去flink的lib库下,发现flink-dist_2.11-1.6.1.jar里包含了gson-xxx包,但是打开这个包一看类中没有setLenient方法,于是在服务器上建立一个commlib,把gson-2.8.0.jar(包含setLenient方法)放进去,然后使用flink run提交时,指定classpath即可。
    2.日志冲突
    Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.impl.Log4jLoggerFactory loaded from file:/opt/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar). If you are using WebLogic you will need to add ‘org.slf4j’ to prefer-application-packages in WEB-INF/weblogic.xml: org.slf4j.impl.Log4jLoggerFactory
    排除springboot中的日志即可:

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter</artifactId>
    4. <exclusions>
    5. <exclusion>
    6. <groupId>ch.qos.logback</groupId>
    7. <artifactId>logback-classic</artifactId>
    8. </exclusion>
    9. </exclusions>
    10. </dependency>

    3.flink run提交作业到yarn上时,如果需要指定classpath,则需要指定到确定的jar包,指定目录不可行。那么假如所有依赖包已经放置在目录中,拼接的shell可以这么写:

    1. lib_classpath="";
    2. for jar in `ls /home/hadoop/lib`
    3. do
    4. jar_suffix=${jar##*.}
    5. if [ "$jar_suffix" = "jar" ]
    6. then
    7. jar_path=" --classpath file:///home/hadoop/lib/$jar "
    8. lib_classpath=${lib_classpath}${jar_path}
    9. else
    10. echo "the jar file $jar it not legal jar file,skip appendig"
    11. fi
    12. done

    拼接后的lib_classpath值如下效果:

    1. --classpath file:///home/hadoop/lib/accessors-smart-1.2.jar --classpath file:///home/hadoop/lib/akka-actor_2.11-2.4.20.jar

    注意:如果jar包放本地文件系统,那么需要每台机器都放一份。