Storm/Trident集成MongoDB。该包中包括核心bolts和trident states,允许storm topology将storm tuples插入到数据库集合中,或者针storm topology中的数据库集合执行更新查询。

Insert into Database

此包中包含用于将数据插入数据库集合的bolt和trident state。

MongoMapper

使用MongoDB在集合中插入数据的主要API是 org.apache.storm.mongodb.common.mapper.MongoMapper 接口:

  1. public interface MongoMapper extends Serializable {
  2. Document toDocument(ITuple tuple);
  3. }

SimpleMongoMapper

storm-mongodb包括一个通用的MongoMapper实现,称为SimpleMongoMapper,可以将Storm元组映射到一个数据库文件。 SimpleMongoMapper假定storm tuple具有与您要写入的数据库集合中的文档字段名称相同的字段。

  1. public class SimpleMongoMapper implements MongoMapper {
  2. private String[] fields;
  3. @Override
  4. public Document toDocument(ITuple tuple) {
  5. Document document = new Document();
  6. for(String field : fields){
  7. document.append(field, tuple.getValueByField(field));
  8. }
  9. return document;
  10. }
  11. public SimpleMongoMapper withFields(String... fields) {
  12. this.fields = fields;
  13. return this;
  14. }
  15. }

MongoInsertBolt

要使用MongoInsertBolt,您可以通过指定url,collectionName和将 storm tuple转换为DB文档的 MongoMapper实现来构造它的一个实例。 以下是标准的URI连接方案: mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

有关Mongo URI的更多选项信息(例如:写关注选项),您可以访问 https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options

  1. String url = "mongodb://127.0.0.1:27017/test";
  2. String collectionName = "wordcount";
  3. MongoMapper mapper = new SimpleMongoMapper()
  4. .withFields("word", "count");
  5. MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);

MongoTridentState

我们还支持在trident topologies中持久化trident state 。 要创建一个Mongo持久的trident state,您需要使用url,collectionName,“MongoMapper”实例初始化它。 见下面的例子:

  1. MongoMapper mapper = new SimpleMongoMapper()
  2. .withFields("word", "count");
  3. MongoState.Options options = new MongoState.Options()
  4. .withUrl(url)
  5. .withCollectionName(collectionName)
  6. .withMapper(mapper);
  7. StateFactory factory = new MongoStateFactory(options);
  8. TridentTopology topology = new TridentTopology();
  9. Stream stream = topology.newStream("spout1", spout);
  10. stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());

NOTE:

如果没有提供唯一的索引,在发生故障的情况下,trident state插入可能会导致重复的文档。

Update from Database

包中包含用于从数据库集合更新数据的bolt。

SimpleMongoUpdateMapper

storm-mongodb包括一个通用的MongoMapper实现,称为SimpleMongoUpdateMapper,可以将Storm元组映射到数据库文档。 SimpleMongoUpdateMapper假定风暴元组具有与您要写入的数据库集合中的文档字段名称相同的字段。 SimpleMongoUpdateMapper使用$ set运算符来设置文档中字段的值。 有关更新操作的更多信息,可以访问 https://docs.mongodb.org/manual/reference/operator/update/

  1. public class SimpleMongoUpdateMapper implements MongoMapper {
  2. private String[] fields;
  3. @Override
  4. public Document toDocument(ITuple tuple) {
  5. Document document = new Document();
  6. for(String field : fields){
  7. document.append(field, tuple.getValueByField(field));
  8. }
  9. return new Document("$set", document);
  10. }
  11. public SimpleMongoUpdateMapper withFields(String... fields) {
  12. this.fields = fields;
  13. return this;
  14. }
  15. }

QueryFilterCreator

用于创建MongoDB查询过滤器的主要API是 org.apache.storm.mongodb.common.QueryFilterCreator 接口:

  1. public interface QueryFilterCreator extends Serializable {
  2. Bson createFilter(ITuple tuple);
  3. }

SimpleQueryFilterCreator

storm-mongodb包括一个通用的QueryFilterCreator实现,称为SimpleQueryFilterCreator,可以通过给定的Tuple创建一个MongoDB查询过滤器。 QueryFilterCreator使用$ eq运算符匹配等于指定值的值。 有关查询运算符的更多信息,可以访问 https://docs.mongodb.org/manual/reference/operator/query/

  1. public class SimpleQueryFilterCreator implements QueryFilterCreator {
  2. private String field;
  3. @Override
  4. public Bson createFilter(ITuple tuple) {
  5. return Filters.eq(field, tuple.getValueByField(field));
  6. }
  7. public SimpleQueryFilterCreator withField(String field) {
  8. this.field = field;
  9. return this;
  10. }
  11. }

MongoUpdateBolt

要使用MongoUpdateBolt,你可以通过指定Mongo url,collectionName,一个QueryFilterCreator实现和一个``MongoMapper实现来将storm tuple转换成DB文档来构造一个实例。

  1. MongoMapper mapper = new SimpleMongoUpdateMapper()
  2. .withFields("word", "count");
  3. QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
  4. .withField("word");
  5. MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
  6. //if a new document should be inserted if there are no matches to the query filter
  7. //updateBolt.withUpsert(true);

或者为 QueryFilterCreator使用匿名内部类实现:

  1. MongoMapper mapper = new SimpleMongoUpdateMapper()
  2. .withFields("word", "count");
  3. QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
  4. @Override
  5. public Bson createFilter(ITuple tuple) {
  6. return Filters.gt("count", 3);
  7. }
  8. };
  9. MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
  10. //if a new document should be inserted if there are no matches to the query filter
  11. //updateBolt.withUpsert(true);