Storm Druid Bolt 和 TridentState

该模块提供了将数据写入Druid 数据存储的核心Strom和Trident bolt(螺栓)的实现。 该实现使用Druid’s的Tranquility库向druid发送消息。

一些实施细节从现有的借用 Tranquility Storm Bolt. 这个新的Bolt(螺栓)增加了支持最新的storm释放,并保持在storm回购的bolt(螺栓)。

Core Bolt

下面的例子描述了使用 org.apache.storm.druid.bolt.DruidBeamBolt的核心bolt(螺栓)默认情况下,该bolt(螺栓)希望收到元组,其中”事件”字段提供您的事件类型。可以通过实现ITupleDruidEventMapper接口来更改此逻辑。

  1. DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
  2. DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
  3. ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
  4. DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
  5. topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
  6. topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());

Trident State

  1. DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
  2. ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
  3. final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
  4. stream.peek(new Consumer() {
  5. @Override
  6. public void accept(TridentTuple input) {
  7. LOG.info("########### Received tuple: [{}]", input);
  8. }
  9. }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());

样品工厂实现

Druid bolt 必须配置一个 BeamFactory. 您可以使用它们其中一个来实现 DruidBeams builder’s “buildBeam()” method. See the Configuration documentation for details. For more details refer Tranquility library docs.

  1. public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
  2. @Override
  3. public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
  4. final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
  5. final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
  6. final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
  7. final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
  8. List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
  9. new CountAggregatorFactory(
  10. "click"
  11. )
  12. );
  13. // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
  14. final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
  15. {
  16. @Override
  17. public DateTime timestamp(Map<String, Object> theMap)
  18. {
  19. return new DateTime(theMap.get("timestamp"));
  20. }
  21. };
  22. // Tranquility uses ZooKeeper (through Curator) for coordination.
  23. final CuratorFramework curator = CuratorFrameworkFactory
  24. .builder()
  25. .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
  26. .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
  27. .build();
  28. curator.start();
  29. // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
  30. // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
  31. final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
  32. // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
  33. // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
  34. // In this case, we won't provide one, so we're just using Jackson.
  35. final Beam<Map<String, Object>> beam = DruidBeams
  36. .builder(timestamper)
  37. .curator(curator)
  38. .discoveryPath(discoveryPath)
  39. .location(DruidLocation.create(indexService, dataSource))
  40. .timestampSpec(timestampSpec)
  41. .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
  42. .tuning(
  43. ClusteredBeamTuning
  44. .builder()
  45. .segmentGranularity(Granularity.HOUR)
  46. .windowPeriod(new Period("PT10M"))
  47. .partitions(1)
  48. .replicants(1)
  49. .build()
  50. )
  51. .druidBeamConfig(
  52. DruidBeamConfig
  53. .builder()
  54. .indexRetryPeriod(new Period("PT10M"))
  55. .build())
  56. .buildBeam();
  57. return beam;
  58. }
  59. }

Example code is available here.