flink 1.10.0版 hive 1.2.1版
参考博客:flink 对 hive的支持
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>flink_example</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><!--<properties>--><!--<flink.version>1.10.0</flink.version>--><!--<hive.version>1.2.1</hive.version>--><!--<properties/>--><properties><flink.version>1.10.0</flink.version><scala.binary.version>2.11</scala.binary.version><kafka.version>2.2.0</kafka.version><hive.version>1.2.1</hive.version><hadoop.version>2.8.5</hadoop.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-fs</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.jolbox</groupId><artifactId>bonecp</artifactId><version>0.8.0.RELEASE</version></dependency><dependency><groupId>com.twitter</groupId><artifactId>parquet-hive-bundle</artifactId><version>1.6.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-cli</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-common</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-service</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-shims</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive.hcatalog</groupId><artifactId>hive-hcatalog-core</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.hive.hcatalog</groupId><artifactId>hive-hcatalog</artifactId><version>${hive.version}</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version><type>pom</type></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-8.0</version></dependency><!-- Flink Dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><!-- 该插件用于将 Scala 代码编译成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version> <executions><execution><!-- 声明绑定到 maven 的 compile 阶段 --> <goals><goal>testCompile</goal> </goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef><!--<source>1.8</source>--><!--<target>1.8</target>--></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase> <goals><goal>single</goal> </goals></execution> </executions></plugin><plugin><artifactId>maven-dependency-plugin</artifactId><configuration><excludeTransitive>false</excludeTransitive><stripVersion>true</stripVersion><outputDirectory>./lib</outputDirectory></configuration></plugin></plugins> </build></project>
代码01:
package org.caip;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.catalog.hive.HiveCatalog;public class ReadHive {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().inStreamingMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);// Configuration configuration = tableEnv.getConfig().getConfiguration();// configuration.setString("streaming-mode","false");String name = "myhive";String defaultDatabase = "caip";String hiveConfDir = "/Users/gaozhen/IdeaProjects/flink/src/main/resources"; // a local pathString version = "1.2.1";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);tableEnv.registerCatalog("myhive", hive);tableEnv.useCatalog("myhive");String database[] = tableEnv.listDatabases();System.out.println(database);// set the HiveCatalog as the current catalog of the sessiontableEnv.useCatalog("myhive");}}
