Storm Cassandra 集成

Apache Cassandra 的 Bolt API 实现

这个库提供了 Apache Cassandra 之上的核心 storm bolt . 提供简单的 DSL 来 map storm Tuple 到 Cassandra Query Language Statement (Cassandra 查询语言 Statement).

Configuration (配置)

以下属性可能会传递给 storm 配置.

Property name(属性名称) Description(描述) Default(默认)
cassandra.keyspace -
cassandra.nodes - {“localhost”}
cassandra.username - -
cassandra.password - -
cassandra.port - 9092
cassandra.output.consistencyLevel - ONE
cassandra.batch.size.rows - 100
cassandra.retryPolicy - DefaultRetryPolicy
cassandra.reconnectionPolicy.baseDelayMs - 100 (ms)
cassandra.reconnectionPolicy.maxDelayMs - 60000 (ms)

CassandraWriterBolt

Static import

  1. import static org.apache.storm.cassandra.DynamicStatementBuilder.*

Insert Query Builder (插入查询生成器)

Insert query including only the specified tuple fields (插入仅包含指定 tuple 字段的查询).
  1. new CassandraWriterBolt(
  2. async(
  3. simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
  4. .with(
  5. fields("title", "year", "performer", "genre", "tracks")
  6. )
  7. )
  8. );
Insert query including all tuple fields(插入包含所有 tuple 字段的查询).
  1. new CassandraWriterBolt(
  2. async(
  3. simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
  4. .with( all() )
  5. )
  6. );
Insert multiple queries from one input tuple (从一个 input tuple 插入多个查询).
  1. new CassandraWriterBolt(
  2. async(
  3. simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
  4. simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
  5. )
  6. );
Insert query using QueryBuilder(使用 QueryBuilder 插入查询)
  1. new CassandraWriterBolt(
  2. async(
  3. simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
  4. .with(all()))
  5. )
  6. )
Insert query with static bound query (使用 static bound query 插入 查询)
  1. new CassandraWriterBolt(
  2. async(
  3. boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
  4. .bind(all());
  5. )
  6. );
Insert query with static bound query using named setters and aliases (使用 named setters 和 aliases 插入带有 static bound query 的查询)
  1. new CassandraWriterBolt(
  2. async(
  3. boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
  4. .bind(
  5. field("ti"),as("title"),
  6. field("ye").as("year")),
  7. field("pe").as("performer")),
  8. field("ge").as("genre")),
  9. field("tr").as("tracks"))
  10. ).byNamedSetters()
  11. )
  12. );
Insert query with bound statement load from storm configuration (从 storm 配置插入 bound statement load 的查询)
  1. new CassandraWriterBolt(
  2. boundQuery(named("insertIntoAlbum"))
  3. .bind(all());
Insert query with bound statement load from tuple field (从 tuple 字段插入 bound statement load 的查询)
  1. new CassandraWriterBolt(
  2. boundQuery(namedByField("cql"))
  3. .bind(all());
Insert query with batch statement (使用 batch 语句插入查询)
  1. // Logged
  2. new CassandraWriterBolt(loggedBatch(
  3. simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
  4. simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
  5. )
  6. );
  7. // UnLogged
  8. new CassandraWriterBolt(unLoggedBatch(
  9. simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
  10. simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
  11. )
  12. );

如何处理 query execution results (查询执行结果)

ExecutionResultHandler 接口可用于自定义 execution result (执行结果)应如何处理.

  1. public interface ExecutionResultHandler extends Serializable {
  2. void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
  3. void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
  4. void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
  5. void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
  6. void onQuerySuccess(OutputCollector collector, Tuple tuple);
  7. }

默认情况下, CassandraBolt 将在所有的 Cassandra Exception 中 fails(失败)一个 tuple (请参阅 BaseExecutionResultHandler).

  1. new CassandraWriterBolt(insertInto("album").values(with(all()).build())
  2. .withResultHandler(new MyCustomResultHandler());

Declare Output fields (声明输出字段)

CassandraBolt 可以声明 declare output fields / stream output fields(输出字段/流输出字段). 例如, 这可以用于在 error (错误)或者 chain queries (链式查询)上 remit (传递)一个 new tuple (新的元组).

  1. new CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
  2. .withResultHandler(new EmitOnDriverExceptionResultHandler());
  3. .withStreamOutputFields("stream_error", new Fields("message");
  4. public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler {
  5. @Override
  6. protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
  7. LOG.error("An error occurred while executing cassandra statement", e);
  8. collector.emit("stream_error", new Values(e.getMessage()));
  9. collector.ack(tuple);
  10. }
  11. }

Murmur3FieldGrouping

Murmur3StreamGrouping 可以用来优化 cassandra writes (cassandra 的写入). 根据 specified row partition keys (指定的行分区键), 该 stream 在 bolt 的 task 之间进行 partitioned (分区).

  1. CassandraWriterBolt bolt = new CassandraWriterBolt(
  2. insertInto("album")
  3. .values(
  4. with(fields("title", "year", "performer", "genre", "tracks")
  5. ).build());
  6. builder.setBolt("BOLT_WRITER", bolt, 4)
  7. .customGrouping("spout", new Murmur3StreamGrouping("title"))

Trident API 支持

storm-cassandra 支持 用于将 data inserting(插入) Cassandra 的 Trident state API . java CassandraState.Options options = new CassandraState.Options(new CassandraContext()); CQLStatementTupleMapper insertTemperatureValues = boundQuery( "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)") .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))); options.withCQLStatementTupleMapper(insertTemperatureValues); CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options); TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name")); stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x")); stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());

以下 state API 用于从 Cassandra querying(查询) 数据. java CassandraState.Options options = new CassandraState.Options(new CassandraContext()); CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?") .bind(with(field("weather_station_id").as("id"))); options.withCQLStatementTupleMapper(insertTemperatureValues); options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name"))); CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options); CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory(); TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));