Presto 是一个分布式的 SQL 查询引擎,非常适合用于 OLAP 场景。官方也许因为版权原因没有提供 oracle 的插件,oracle 在实际场景中还是使用的非常多的,有必要介绍些插件开发的流程。如果读者只是部署,不做开发,可以 clone 我托管在 GitHub 的Presto来进行编译、部署。

搭建开发环境

关于如何搭建开发环境,presto 的 github 首页已经给出教程,这里不再赘述。但是要注意presto 在 windows 平台下会编译失败,而且对源码开发之前必须要先编译 presto。这里推荐使用 IntelliJIDEA 作为开发的 IDE, 如果你已经将 presto 导入到 IDE 中,并且成功运行“PrestoServer”,那么你已经成功一半了,其实在 github 上面有人已经托管了presto-oracle这个插件,但是这个插件只能满足简单的查询,无法通过 presto 向 oracle 中插入数据。而且它这个不是集成到 presto 的源码中的,无法对插件进行调试。

新建 module

官方已经编写了 MySQL 插件,我们可以按照这个模板来开发。我们在 Presto 的根目录下新建 module,该 module 的 pom 信息如下:

  1. <?xml version="1.0"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <parent>
  5. <groupId>com.facebook.presto</groupId>
  6. <artifactId>presto-root</artifactId>
  7. <version>0.157.2-SNAPSHOT</version>
  8. </parent>
  9. <artifactId>presto-oracle</artifactId>
  10. <description>Presto - Oracle Connector</description>
  11. <packaging>presto-plugin</packaging>
  12. <properties>
  13. <!--check license-->
  14. <air.main.basedir>${project.parent.basedir}</air.main.basedir>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>com.facebook.presto</groupId>
  19. <artifactId>presto-base-jdbc</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>io.airlift</groupId>
  23. <artifactId>configuration</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>com.google.guava</groupId>
  27. <artifactId>guava</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>com.google.inject</groupId>
  31. <artifactId>guice</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>javax.validation</groupId>
  35. <artifactId>validation-api</artifactId>
  36. </dependency>
  37. <dependency>
  38. <groupId>com.oracle</groupId>
  39. <artifactId>ojdbc6</artifactId>
  40. </dependency>
  41. <dependency>
  42. <groupId>javax.inject</groupId>
  43. <artifactId>javax.inject</artifactId>
  44. </dependency>
  45. <!-- Presto SPI -->
  46. <dependency>
  47. <groupId>com.facebook.presto</groupId>
  48. <artifactId>presto-spi</artifactId>
  49. <scope>provided</scope>
  50. </dependency>
  51. <dependency>
  52. <groupId>io.airlift</groupId>
  53. <artifactId>slice</artifactId>
  54. <scope>provided</scope>
  55. </dependency>
  56. <dependency>
  57. <groupId>io.airlift</groupId>
  58. <artifactId>units</artifactId>
  59. <scope>provided</scope>
  60. </dependency>
  61. <dependency>
  62. <groupId>com.fasterxml.jackson.core</groupId>
  63. <artifactId>jackson-annotations</artifactId>
  64. <scope>provided</scope>
  65. </dependency>
  66. <!-- for testing -->
  67. <dependency>
  68. <groupId>org.testng</groupId>
  69. <artifactId>testng</artifactId>
  70. <scope>test</scope>
  71. </dependency>
  72. <dependency>
  73. <groupId>io.airlift</groupId>
  74. <artifactId>testing</artifactId>
  75. <scope>test</scope>
  76. </dependency>
  77. <dependency>
  78. <groupId>io.airlift</groupId>
  79. <artifactId>json</artifactId>
  80. <scope>test</scope>
  81. </dependency>
  82. <dependency>
  83. <groupId>com.facebook.presto</groupId>
  84. <artifactId>presto-main</artifactId>
  85. <scope>test</scope>
  86. </dependency>
  87. <dependency>
  88. <groupId>com.facebook.presto</groupId>
  89. <artifactId>presto-tpch</artifactId>
  90. <scope>test</scope>
  91. </dependency>
  92. <dependency>
  93. <groupId>io.airlift.tpch</groupId>
  94. <artifactId>tpch</artifactId>
  95. <scope>test</scope>
  96. </dependency>
  97. <dependency>
  98. <groupId>com.facebook.presto</groupId>
  99. <artifactId>presto-tests</artifactId>
  100. <scope>test</scope>
  101. </dependency>
  102. <dependency>
  103. <groupId>io.airlift</groupId>
  104. <artifactId>testing-mysql-server</artifactId>
  105. <scope>test</scope>
  106. </dependency>
  107. </dependencies>
  108. <build>
  109. <plugins>
  110. <plugin>
  111. <groupId>org.apache.maven.plugins</groupId>
  112. <artifactId>maven-dependency-plugin</artifactId>
  113. <version>2.10</version>
  114. <configuration>
  115. <!--disable check dependency-->
  116. <skip>true</skip>
  117. <failOnWarning>${air.check.fail-dependency}</failOnWarning>
  118. <ignoreNonCompile>true</ignoreNonCompile>
  119. </configuration>
  120. </plugin>
  121. <plugin>
  122. <groupId>org.apache.maven.plugins</groupId>
  123. <artifactId>maven-checkstyle-plugin</artifactId>
  124. <executions>
  125. <execution>
  126. <phase>validate</phase>
  127. <goals>
  128. <goal>check</goal>
  129. </goals>
  130. <configuration>
  131. <!--disable check code style-->
  132. <skip>true</skip>
  133. </configuration>
  134. </execution>
  135. </executions>
  136. </plugin>
  137. </plugins>
  138. </build>
  139. </project>

这里需要说明下的是 Maven 的公共库查找不到 Oracle JDBC 的依赖,所以需要用户自行下载 jar 包并安装到本地 Maven 库中。另外 Presto 有很严格的代码规范以及依赖检查,如果代码或者依赖不通过检查是无法编译成功的。而且公共 Maven 库中无法找到 Oracle JDBC 的依赖,所以依赖检查肯定不能通过。所以我在 pom 文件中禁用了代码规范检查插件,还有依赖检查插件。

1. 禁用依赖检查

  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-dependency-plugin</artifactId>
  4. <version>2.10</version>
  5. <configuration>
  6. <!--disable check dependency-->
  7. <skip>true</skip>
  8. <failOnWarning>${air.check.fail-dependency}</failOnWarning>
  9. <ignoreNonCompile>true</ignoreNonCompile>
  10. </configuration>
  11. </plugin>

2. 禁用代码规范检查

  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-checkstyle-plugin</artifactId>
  4. <executions>
  5. <execution>
  6. <phase>validate</phase>
  7. <goals>
  8. <goal>check</goal>
  9. </goals>
  10. <configuration>
  11. <!--disable check code style-->
  12. <skip>true</skip>
  13. </configuration>
  14. </execution>
  15. </executions>
  16. </plugin>

3. 在 presto-root 下添加 ojdbc 的依赖信息

  1. <dependency>
  2. <groupId>com.oracle</groupId>
  3. <artifactId>ojdbc6</artifactId>
  4. <version>11.2.0.4.0-atlassian-hosted</version>
  5. </dependency>

集成 module 到 Presto

因为我们是基于源码开发的,为了将 presto-oracle 集成到 Presto 中进行测试以及打包发布还需如下配置:

1.修改config.properties 配置文件

config.properties 文件在“presto/presto-main/etc”路径下,在plugin.bundles 下添加“../presto-oracle/pom.xml”。
Presto Oracle 插件编写教程 - 图1
只有添加了 presto-oracle 的 pom 信息 presto 在 IDE 中调试时再回加载 presto-oracle 插件,否则无效,上述配置只是用于开发环境,正式环境下无需配置。

2. 修改 presto.xml 配置文件 presto.xml 文件在“presto/presto-server/src/main/provisio”路径下,添加如下配置信息:

  1. <artifactSet to="plugin/oracle">
  2. <artifact id="${project.groupId}:presto-oracle:zip:${project.version}">
  3. <unpack />
  4. </artifact>
  5. </artifactSet>

Presto Oracle 插件编写教程 - 图2
上述配置的作用是在 presto 编译时可以将我们的 presto-oracle 插件添加到 plugin 目录下。
Presto Oracle 插件编写教程 - 图3

编写代码

插件的代码就四个类,一点都不复杂,但是需要说明的是这些代码必须包含 license 信息,因为 presto 配置了证书的检查插件,如果代码中不包含license 编译时会报错。这个不像代码检查那么麻烦,代码检查有一点点不规范就会报错,这个证书检查只要在自己新建的类中添加 license 就即可通过。

1.OraclePlugin.java

  1. /*
  2. * Licensed under the Apache License, Version 2.0 (the "License");
  3. * you may not use this file except in compliance with the License.
  4. * You may obtain a copy of the License at
  5. *
  6. * http://www.apache.org/licenses/LICENSE-2.0
  7. *
  8. * Unless required by applicable law or agreed to in writing, software
  9. * distributed under the License is distributed on an "AS IS" BASIS,
  10. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. * See the License for the specific language governing permissions and
  12. * limitations under the License.
  13. */
  14. package com.facebook.presto.plugin.oracle;
  15. import com.facebook.presto.plugin.jdbc.JdbcPlugin;
  16. public class OraclePlugin
  17. extends JdbcPlugin
  18. {
  19. public OraclePlugin()
  20. {
  21. super("oracle", new OracleClientModule());
  22. }
  23. }

上面构造函数中传入的”oracle” 应该是用于后面 catalog 中 name 的配置项,这个猜测没有验证,先这样配置。

2.OracleConfig.java

  1. /*
  2. * Licensed under the Apache License, Version 2.0 (the "License");
  3. * you may not use this file except in compliance with the License.
  4. * You may obtain a copy of the License at
  5. *
  6. * http://www.apache.org/licenses/LICENSE-2.0
  7. *
  8. * Unless required by applicable law or agreed to in writing, software
  9. * distributed under the License is distributed on an "AS IS" BASIS,
  10. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. * See the License for the specific language governing permissions and
  12. * limitations under the License.
  13. */
  14. package com.facebook.presto.plugin.oracle;
  15. import io.airlift.configuration.Config;
  16. public class OracleConfig {
  17. private String user;
  18. private String password;
  19. private String url;
  20. public String getUser() {
  21. return user;
  22. }
  23. @Config("oracle.user")
  24. public OracleConfig setUser(String user) {
  25. this.user = user;
  26. return this;
  27. }
  28. public String getPassword() {
  29. return password;
  30. }
  31. @Config("oracle.password")
  32. public OracleConfig setPassword(String password) {
  33. this.password = password;
  34. return this;
  35. }
  36. public String getUrl() {
  37. return url;
  38. }
  39. @Config("oracle.password")
  40. public OracleConfig setUrl(String url) {
  41. this.url = url;
  42. return this;
  43. }
  44. }

上述代码中的注解千万不要省略掉,否则 presto 加载 catalog 时无法查找到这些属性。

3.OracleClientModule.java

  1. /*
  2. * Licensed under the Apache License, Version 2.0 (the "License");
  3. * you may not use this file except in compliance with the License.
  4. * You may obtain a copy of the License at
  5. *
  6. * http://www.apache.org/licenses/LICENSE-2.0
  7. *
  8. * Unless required by applicable law or agreed to in writing, software
  9. * distributed under the License is distributed on an "AS IS" BASIS,
  10. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. * See the License for the specific language governing permissions and
  12. * limitations under the License.
  13. */
  14. package com.facebook.presto.plugin.oracle;
  15. import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
  16. import com.facebook.presto.plugin.jdbc.JdbcClient;
  17. import com.google.inject.Binder;
  18. import com.google.inject.Module;
  19. import com.google.inject.Scopes;
  20. import static io.airlift.configuration.ConfigBinder.configBinder;
  21. public class OracleClientModule
  22. implements Module
  23. {
  24. @Override
  25. public void configure(Binder binder)
  26. {
  27. binder.bind(JdbcClient.class).to(OracleClient.class).in(Scopes.SINGLETON);
  28. configBinder(binder).bindConfig(BaseJdbcConfig.class);
  29. configBinder(binder).bindConfig(OracleConfig.class);
  30. }
  31. }

4.OracleClient.java

  1. /*
  2. * Licensed under the Apache License, Version 2.0 (the "License");
  3. * you may not use this file except in compliance with the License.
  4. * You may obtain a copy of the License at
  5. *
  6. * http://www.apache.org/licenses/LICENSE-2.0
  7. *
  8. * Unless required by applicable law or agreed to in writing, software
  9. * distributed under the License is distributed on an "AS IS" BASIS,
  10. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. * See the License for the specific language governing permissions and
  12. * limitations under the License.
  13. */
  14. package com.facebook.presto.plugin.oracle;
  15. import com.facebook.presto.plugin.jdbc.*;
  16. import com.facebook.presto.spi.*;
  17. import com.facebook.presto.spi.type.Type;
  18. import com.google.common.base.Joiner;
  19. import com.google.common.base.Throwables;
  20. import com.google.common.collect.ImmutableList;
  21. import com.google.common.collect.ImmutableSet;
  22. import oracle.jdbc.OracleDriver;
  23. import javax.annotation.Nullable;
  24. import javax.inject.Inject;
  25. import java.sql.Connection;
  26. import java.sql.DatabaseMetaData;
  27. import java.sql.ResultSet;
  28. import java.sql.SQLException;
  29. import java.util.ArrayList;
  30. import java.util.List;
  31. import java.util.Set;
  32. import java.util.UUID;
  33. import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
  34. import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
  35. import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
  36. import static com.google.common.collect.Iterables.getOnlyElement;
  37. import static com.google.common.collect.Maps.fromProperties;
  38. import static java.util.Locale.ENGLISH;
  39. public class OracleClient
  40. extends BaseJdbcClient
  41. {
  42. @Inject
  43. public OracleClient(JdbcConnectorId connectorId, BaseJdbcConfig config, OracleConfig oracleConfig)
  44. throws SQLException
  45. {
  46. super(connectorId, config, "", new OracleDriver());
  47. }
  48. @Override
  49. public Set<String> getSchemaNames() {
  50. try (Connection connection = driver.connect(connectionUrl,
  51. connectionProperties);
  52. ResultSet resultSet = connection.getMetaData().getSchemas()) {
  53. ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
  54. while (resultSet.next()) {
  55. String schemaName = resultSet.getString(1).toLowerCase();
  56. schemaNames.add(schemaName);
  57. }
  58. return schemaNames.build();
  59. } catch (SQLException e) {
  60. throw Throwables.propagate(e);
  61. }
  62. }
  63. @Override
  64. protected ResultSet getTables(Connection connection, String schemaName, String tableName) throws SQLException {
  65. return connection.getMetaData().getTables(null, schemaName, tableName,
  66. new String[] { "TABLE", "SYNONYM" });
  67. }
  68. @Nullable
  69. @Override
  70. public JdbcTableHandle getTableHandle(SchemaTableName schemaTableName) {
  71. try (Connection connection = driver.connect(connectionUrl,
  72. connectionProperties)) {
  73. DatabaseMetaData metadata = connection.getMetaData();
  74. String jdbcSchemaName = schemaTableName.getSchemaName();
  75. String jdbcTableName = schemaTableName.getTableName();
  76. if (metadata.storesUpperCaseIdentifiers()) {
  77. jdbcSchemaName = jdbcSchemaName.toUpperCase();
  78. jdbcTableName = jdbcTableName.toUpperCase();
  79. }
  80. try (ResultSet resultSet = getTables(connection, jdbcSchemaName,
  81. jdbcTableName)) {
  82. List<JdbcTableHandle> tableHandles = new ArrayList<>();
  83. while (resultSet.next()) {
  84. tableHandles.add(new JdbcTableHandle(connectorId,
  85. schemaTableName, resultSet.getString("TABLE_CAT"),
  86. resultSet.getString("TABLE_SCHEM"), resultSet
  87. .getString("TABLE_NAME")));
  88. }
  89. if (tableHandles.isEmpty()) {
  90. return null;
  91. }
  92. if (tableHandles.size() > 1) {
  93. throw new PrestoException(NOT_SUPPORTED,
  94. "Multiple tables matched: " + schemaTableName);
  95. }
  96. return getOnlyElement(tableHandles);
  97. }
  98. } catch (SQLException e) {
  99. throw Throwables.propagate(e);
  100. }
  101. }
  102. @Override
  103. public List<JdbcColumnHandle> getColumns(JdbcTableHandle tableHandle) {
  104. try (Connection connection = driver.connect(connectionUrl,
  105. connectionProperties)) {
  106. ( (oracle.jdbc.driver.OracleConnection)connection ).setIncludeSynonyms(true);
  107. DatabaseMetaData metadata = connection.getMetaData();
  108. String schemaName = tableHandle.getSchemaName().toUpperCase();
  109. String tableName = tableHandle.getTableName().toUpperCase();
  110. try (ResultSet resultSet = metadata.getColumns(null, schemaName,
  111. tableName, null)) {
  112. List<JdbcColumnHandle> columns = new ArrayList<>();
  113. boolean found = false;
  114. while (resultSet.next()) {
  115. found = true;
  116. Type columnType = toPrestoType(resultSet
  117. .getInt("DATA_TYPE"), resultSet.getInt("COLUMN_SIZE"));
  118. if (columnType != null) {
  119. String columnName = resultSet.getString("COLUMN_NAME");
  120. columns.add(new JdbcColumnHandle(connectorId,
  121. columnName, columnType));
  122. }
  123. }
  124. if (!found) {
  125. throw new TableNotFoundException(
  126. tableHandle.getSchemaTableName());
  127. }
  128. if (columns.isEmpty()) {
  129. throw new PrestoException(NOT_SUPPORTED,
  130. "Table has no supported column types: "
  131. + tableHandle.getSchemaTableName());
  132. }
  133. return ImmutableList.copyOf(columns);
  134. }
  135. } catch (SQLException e) {
  136. throw Throwables.propagate(e);
  137. }
  138. }
  139. @Override
  140. public List<SchemaTableName> getTableNames(@Nullable String schema) {
  141. try (Connection connection = driver.connect(connectionUrl,
  142. connectionProperties)) {
  143. DatabaseMetaData metadata = connection.getMetaData();
  144. if (metadata.storesUpperCaseIdentifiers() && (schema != null)) {
  145. schema = schema.toUpperCase();
  146. }
  147. try (ResultSet resultSet = getTables(connection, schema, null)) {
  148. ImmutableList.Builder<SchemaTableName> list = ImmutableList
  149. .builder();
  150. while (resultSet.next()) {
  151. list.add(getSchemaTableName(resultSet));
  152. }
  153. return list.build();
  154. }
  155. } catch (SQLException e) {
  156. throw Throwables.propagate(e);
  157. }
  158. }
  159. @Override
  160. protected SchemaTableName getSchemaTableName(ResultSet resultSet) throws SQLException {
  161. String tableSchema = resultSet.getString("TABLE_SCHEM");
  162. String tableName = resultSet.getString("TABLE_NAME");
  163. if (tableSchema != null) {
  164. tableSchema = tableSchema.toLowerCase();
  165. }
  166. if (tableName != null) {
  167. tableName = tableName.toLowerCase();
  168. }
  169. return new SchemaTableName(tableSchema, tableName);
  170. }
  171. @Override
  172. public void commitCreateTable(JdbcOutputTableHandle handle) {
  173. StringBuilder sql = new StringBuilder()
  174. .append("ALTER TABLE ")
  175. .append(quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName()))
  176. .append(" RENAME TO ")
  177. //new table name needn't to be with catalog and schema
  178. .append(handle.getTableName());
  179. try (Connection connection = getConnection(handle)) {
  180. execute(connection, sql.toString());
  181. }
  182. catch (SQLException e) {
  183. throw new PrestoException(JDBC_ERROR, e);
  184. }
  185. }
  186. }

上述代码中覆写了很多方法,主要是不同的数据库规则不一样,需要我们一一适配。之前提到过 github 中已经有人托管了presto-oracle的插件,但是这个插件没有适配好。例如我又覆写了commitCreateTable这个方法,主要是因为 oracle 中修改表名时新表名不需要再添加 schema ,否则会报错。

5.BaseJdbcClient.java

“BaseJdbcClient”是“OracleClient”的基类,它里面有一个方法涉及到创建临时表的方法,oracle 中表名有长度限制(30以内),所以我对表名进行了字符串的截取操作。

  1. private JdbcOutputTableHandle beginWriteTable(ConnectorTableMetadata tableMetadata)
  2. {
  3. SchemaTableName schemaTableName = tableMetadata.getTable();
  4. String schema = schemaTableName.getSchemaName();
  5. String table = schemaTableName.getTableName();
  6. if (!getSchemaNames().contains(schema)) {
  7. throw new PrestoException(NOT_FOUND, "Schema not found: " + schema);
  8. }
  9. try (Connection connection = driver.connect(connectionUrl, connectionProperties)) {
  10. boolean uppercase = connection.getMetaData().storesUpperCaseIdentifiers();
  11. if (uppercase) {
  12. schema = schema.toUpperCase(ENGLISH);
  13. table = table.toUpperCase(ENGLISH);
  14. }
  15. String catalog = connection.getCatalog();
  16. String temporaryName = "tmp_presto_" + UUID.randomUUID().toString().replace("-", "");
  17. temporaryName = temporaryName.substring(0, 29);
  18. StringBuilder sql = new StringBuilder()
  19. .append("CREATE TABLE ")
  20. .append(quoted(catalog, schema, temporaryName))
  21. .append(" (");
  22. ImmutableList.Builder<String> columnNames = ImmutableList.builder();
  23. ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
  24. ImmutableList.Builder<String> columnList = ImmutableList.builder();
  25. for (ColumnMetadata column : tableMetadata.getColumns()) {
  26. String columnName = column.getName();
  27. if (uppercase) {
  28. columnName = columnName.toUpperCase(ENGLISH);
  29. }
  30. columnNames.add(columnName);
  31. columnTypes.add(column.getType());
  32. columnList.add(new StringBuilder()
  33. .append(quoted(columnName))
  34. .append(" ")
  35. .append(toSqlType(column.getType()))
  36. .toString());
  37. }
  38. Joiner.on(", ").appendTo(sql, columnList.build());
  39. sql.append(")");
  40. execute(connection, sql.toString());
  41. return new JdbcOutputTableHandle(
  42. connectorId,
  43. catalog,
  44. schema,
  45. table,
  46. columnNames.build(),
  47. columnTypes.build(),
  48. temporaryName,
  49. connectionUrl,
  50. fromProperties(connectionProperties));
  51. }
  52. catch (SQLException e) {
  53. throw new PrestoException(JDBC_ERROR, e);
  54. }
  55. }

编译打包

如果上述操作无误的话,重新编译 presto。编译成功之后会有 tar.gz 和 rpm 两种安装包。

1.tar.gz 文件

tar.gz 文件路径:presto/presto-srver/target
Presto Oracle 插件编写教程 - 图4

2.rpm 文件路径:presto/presto-server-rpm/target

Presto Oracle 插件编写教程 - 图5
如果上述操作出现问题,可以参照我托管的Presto,也可以留言与我共同探讨 。