Storm Druid Bolt 和 TridentState
该模块提供了将数据写入Druid 数据存储的核心Strom和Trident bolt(螺栓)的实现。 该实现使用Druid’s的Tranquility库向druid发送消息。
一些实施细节从现有的借用 Tranquility Storm Bolt. 这个新的Bolt(螺栓)增加了支持最新的storm释放,并保持在storm回购的bolt(螺栓)。
Core Bolt
下面的例子描述了使用 org.apache.storm.druid.bolt.DruidBeamBolt的核心bolt(螺栓)默认情况下,该bolt(螺栓)希望收到元组,其中”事件”字段提供您的事件类型。可以通过实现ITupleDruidEventMapper接口来更改此逻辑。
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
Trident State
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));stream.peek(new Consumer() {@Overridepublic void accept(TridentTuple input) {LOG.info("########### Received tuple: [{}]", input);}}).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
样品工厂实现
Druid bolt 必须配置一个 BeamFactory. 您可以使用它们其中一个来实现 DruidBeams builder’s “buildBeam()” method. See the Configuration documentation for details. For more details refer Tranquility library docs.
public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {@Overridepublic Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.pathfinal String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.final List<String> dimensions = ImmutableList.of("publisher", "advertiser");List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("click"));// Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>(){@Overridepublic DateTime timestamp(Map<String, Object> theMap){return new DateTime(theMap.get("timestamp"));}};// Tranquility uses ZooKeeper (through Curator) for coordination.final CuratorFramework curator = CuratorFrameworkFactory.builder().connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)).build();curator.start();// The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,// Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);// Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is// done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.// In this case, we won't provide one, so we're just using Jackson.final Beam<Map<String, Object>> beam = DruidBeams.builder(timestamper).curator(curator).discoveryPath(discoveryPath).location(DruidLocation.create(indexService, dataSource)).timestampSpec(timestampSpec).rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE)).tuning(ClusteredBeamTuning.builder().segmentGranularity(Granularity.HOUR).windowPeriod(new Period("PT10M")).partitions(1).replicants(1).build()).druidBeamConfig(DruidBeamConfig.builder().indexRetryPeriod(new Period("PT10M")).build()).buildBeam();return beam;}}
Example code is available here.
