Storm/Trident集成JDBC.该包中包含的核心bolts 和 trident states ,允许storm topology把storm tuples插入数据库表中或者执行数据库查询,并且丰富了storm topology 中的tuples.

:在下面的示例中,我们使用com.google.common.collect.Lists和com.google.common.collect.Maps。

Inserting into a database. 插入数据库.

该包的中bolt 和 trident state可以将数据插入到数据库表中绑定到单个表。

ConnectionProvider

由不同的连接池机制实现的接口 org.apache.storm.jdbc.common.ConnectionProvider

java public interface ConnectionProvider extends Serializable { /** method must be idempotent. / void prepare();

  1. /**
  2. *
  3. * @return a DB connection over which the queries can be executed.
  4. */
  5. Connection getConnection();
  6. /**
  7. * called once when the system is shutting down, should be idempotent.
  8. */
  9. void cleanup();

} ```

即插即用,我们支持’org.apache.storm.jdbc.common.HikariCPConnectionProvider’,这是一个使用HikariCP的实现。

JdbcMapper

使用JDBC在表中插入数据的主要API是org.apache.storm.jdbc.mapper.JdbcMapper 接口:

  1. public interface JdbcMapper extends Serializable {
  2. List<Column> getColumns(ITuple tuple);
  3. }

getColumns() 方法定义了storm tuple如何映射到数据库中表示一行的列的列表。

返回的列表的顺序很重要。 查询中的占位符以与返回列表相同的顺序进行解析。

例如,如果用户的插入查询是 insert into user(user_id, user_name, create_date) values (?,?, now()) , getColumns 方法返回列表中的第一项将映射到第一位,第二位到第二位,依此类推。我们不会解析提供的查询,以列名称来尝试和解析占位符。没有对查询语法进行任何假设,允许这个连接器被一些非标准的sql框架(如仅支持upsert的Pheonix)使用。

JdbcInsertBolt

使用 JdbcInsertBolt,你可以通过指定一个 ConnectionProvider 实例和将storm tuple转换为DB行的 JdbcMapper 实例来构造一个 JdbcInsertBolt实例。另外,您必须使用 withTableName 方法提供表名或使用 withInsertQuery插入查询。如果您指定了一个插入查询,那么您应该确保您的 JdbcMapper实例将按照插入查询中的顺序返回一列列表。您可以选择指定一个查询超时秒参数,指定插入查询可以执行的最大秒数。默认设置为topology.message.timeout.secs的值为-1将表示不设置任何查询超时。 您应该将查询超时值设置为<= topology.message.timeout.secs。

  1. Map hikariConfigMap = Maps.newHashMap();
  2. hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
  3. hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
  4. hikariConfigMap.put("dataSource.user","root");
  5. hikariConfigMap.put("dataSource.password","password");
  6. ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
  7. String tableName = "user_details";
  8. JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
  9. JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
  10. .withTableName("user")
  11. .withQueryTimeoutSecs(30);
  12. Or
  13. JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
  14. .withInsertQuery("insert into user values (?,?)")
  15. .withQueryTimeoutSecs(30);

SimpleJdbcMapper

storm-jdbc 包括一个通用的 JdbcMapper 实现,称为 SimpleJdbcMapper ,可以映射Storm元组到数据库行。 SimpleJdbcMapper假定storm tuple中有与列名相同名称的字段您要写入的数据库表。

要使用 SimpleJdbcMapper,你只需要告诉你要写入的tableName并提供一个connectionProvider实例。

以下代码创建一个 SimpleJdbcMapper 实例:

1.允许映射器将 storm tuple转换为映射到表test.user_details中的行的列的列表。 2.将使用提供的HikariCP配置来建立具有指定数据库配置的连接池自动找出您要写入的表的列名称和相应的数据类型。 请参阅https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby了解有关hikari配置属性的更多信息。

  1. Map hikariConfigMap = Maps.newHashMap();
  2. hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
  3. hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
  4. hikariConfigMap.put("dataSource.user","root");
  5. hikariConfigMap.put("dataSource.password","password");
  6. ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
  7. String tableName = "user_details";
  8. JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);

在上面的示例中初始化的映射器假定storm tuple具有要插入数据的表的所有列的值,其 getColumn方法将按照Jdbc连接实例的 connection.getMetaData().getColumns();的顺序返回列。

如果您为 JdbcInsertBolt 指定了自己的插入查询,则必须使用显式列显示方式初始化 SimpleJdbcMapper ,以使模式具有与插入查询相同顺序的列。

例如,如果您的插入查询是 Insert into user (user_id, user_name) values (?,?) ,那么您的 SimpleJdbcMapper 应该使用以下语句进行初始化: java List&lt;Column&gt; columnSchema = Lists.newArrayList( new Column("user_id", java.sql.Types.INTEGER), new Column("user_name", java.sql.Types.VARCHAR)); JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);

如果您的 storm tuple仅具有子集列的字段i.e。如果表中的某些列具有默认值,并且您只想为没有默认值的列插入值,则可以通过显示的指定columnschema初始化SimpleJdbcMapper。例如,如果你有一个user_details表create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);在此表中,create_time列具有默认值。 要确保只插入没有默认值的列你可以初始化jdbcMapper 如下:

  1. List<Column> columnSchema = Lists.newArrayList(
  2. new Column("user_id", java.sql.Types.INTEGER),
  3. new Column("user_name", java.sql.Types.VARCHAR),
  4. new Column("dept_name", java.sql.Types.VARCHAR));
  5. JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);

JdbcTridentState

我们还支持持久化trident state 通过使用trident topologies。要创建一个jdbc 持久化的tridentstate,您需要使用表名或插入查询、JdbcMapper实例和连接提供程序实例进行初始化。见下面的例子:

  1. JdbcState.Options options = new JdbcState.Options()
  2. .withConnectionProvider(connectionProvider)
  3. .withMapper(jdbcMapper)
  4. .withTableName("user_details")
  5. .withQueryTimeoutSecs(30);
  6. JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);

类似于 JdbcInsertBolt,你可以使用 withInsertQuery来指定一个自定义的插入查询,而不是指定一个表名。

Lookup from Database 查询数据库

我们支持数据库的 select 查询以丰富拓扑中的storm tuples。使用JDBC执行数据库查询的主要API是 org.apache.storm.jdbc.mapper.JdbcLookupMapper 接口:

  1. void declareOutputFields(OutputFieldsDeclarer declarer);
  2. List<Column> getColumns(ITuple tuple);
  3. List<Values> toTuple(ITuple input, List<Column> columns);

declareOutputFields 方法用于指明将作为处理storm tuple的输出tuple的一部分发出哪些字段。

getColumns 方法指定选择查询中的占位符列及其SQL类型和要使用的值。例如在上面提到的user_details表中,如果你正在执行一个查询’select user_name from user_details where user_id =?and create_time>?getColumns方法将需要一个storm输入tuple,并返回一个包含两个项目的列表。 Column类型的 getValue()方法的第一个实例将被用作 user_id的值进行查找, Column类型的 getValue()方法的第二个实例将被用作 create_time的值。 注意:返回的列表中的顺序决定了占位符的价值。 换句话说,列表中的第一个项目映射在select查询中第一个’?’,第二个项目是第二个’?’,依次类推。

toTuple 方法将select查询的结果接收输入tuple和表示DB行的列的列表,并返回要发射的值的列表。

请注意,它返回一个 Values 列表,而不仅仅是一个 Values 的实例。 这允许将单个DB行映射到多个输出storm tuples。

SimpleJdbcLookupMapper

storm-jdbc包括一个通用的JdbcLookupMapper实现,叫做 SimpleJdbcLookupMapper

要使用 SimpleJdbcMapper,必须使用您的bolt输出的字段和查询语句中占位符的列的列表来初始化它。以下示例展示如何初始化一个 SimpleJdbcLookupMapper,它将’ user_id,user_name,create_date 声明为输出字段, user_id作为查询语句中的占位符列。SimpleJdbcMapper 假定您的tuple中的字段名称等于占位符列名称,即在我们的示例中,SimpleJdbcMapper 将在输入tuple中查找一个字段 use_id ,并将其值用作查询语句中占位符的值。对于构造输出tuple,它首先在输入元组中查找outputFields中指定的字段,如果在输入元组中找不到,那么它会查看select query的输出行中与列名称相同的列。所以在下面的例子中,如果输入tuple有字段 user_id, create_date ,查询语句是select user_name from user_details where user_id = ?,对于每个输入tuple SimpleJdbcLookupMapper.getColumns(tuple)将返回tuple.getValueByField("user_id") 用作select查询的? 中的值。对于DB的每个输出行,SimpleJdbcLookupMapper.toTuple()将使用输入元组中的 user_id, create_date,因为从结果行只添加user_name,并将这3个字段作为单个输出元组返回。

  1. Fields outputFields = new Fields("user_id", "user_name", "create_date");
  2. List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
  3. this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);

JdbcLookupBolt

要使用 JdbcLookupBolt,使用 ConnectionProvider实例, JdbcLookupMapper实例和select查询来构造一个它的实例。你可以选择指定一个查询超时秒参数,指定select查询可以采用的最大秒数。默认值为topology.message.timeout.secs的值。 您应该将此值设置为<= topology.message.timeout.secs。

  1. String selectSql = "select user_name from user_details where user_id = ?";
  2. SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
  3. JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
  4. .withQueryTimeoutSecs(30);

JdbcTridentState for lookup

我们还可以在trident topologies 中查询trident state。

  1. JdbcState.Options options = new JdbcState.Options()
  2. .withConnectionProvider(connectionProvider)
  3. .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
  4. .withSelectQuery("select user_name from user_details where user_id = ?");
  5. .withQueryTimeoutSecs(30);

Example:

可以在 src/test/java/topology 目录中找到一个可运行的例子。

Setup

  • 确保您为您选择的数据库包含JDBC实现依赖关系,作为构建配置的一部分。

  • 测试拓扑执行以下查询,因此您的预期数据库必须支持这些查询才能使测试拓扑工作。

  1. create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
  2. create table if not exists department (dept_id integer, dept_name varchar(100));
  3. create table if not exists user_department (user_id integer, dept_id integer);
  4. insert into department values (1, 'R&D');
  5. insert into department values (2, 'Finance');
  6. insert into department values (3, 'HR');
  7. insert into department values (4, 'Sales');
  8. insert into user_department values (1, 1);
  9. insert into user_department values (2, 2);
  10. insert into user_department values (3, 3);
  11. insert into user_department values (4, 4);
  12. select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;

Execution

使用storm jar命令运行org.apache.storm.jdbc.topology.UserPersistanceTopology类。 参考该课程第5小节storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology [拓扑名称]

要使其与Mysql一起工作,您可以将以下内容添加到pom.xml中

  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>5.1.31</version>
  5. </dependency>

您可以使用maven程序集插件生成具有依赖关系的单个jar。 要使用插件,将以下内容添加到您的pom.xml并执行 mvn clean compile assembly:single

  1. <plugin>
  2. <artifactId>maven-assembly-plugin</artifactId>
  3. <configuration>
  4. <archive>
  5. <manifest>
  6. <mainClass>fully.qualified.MainClass</mainClass>
  7. </manifest>
  8. </archive>
  9. <descriptorRefs>
  10. <descriptorRef>jar-with-dependencies</descriptorRef>
  11. </descriptorRefs>
  12. </configuration>
  13. </plugin>

Mysql Example: storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology

您可以针对应显示新插入的行的用户表执行select查询:

  1. select * from user;

For trident you can view org.apache.storm.jdbc.topology.UserPersistanceTridentTopology.

对于trident,您可以查看org.apache.storm.jdbc.topology.UserPersistanceTridentTopology