Storm OpenTSDB 集成

Storm OpenTSDB Bolt 和 TridentState

OpenTSDB 为时间序列数据提供了可扩展且高可用性的存储. 它由 Time Series Daemon (TSD) servers 以及命令行工具组成. 每个 TSD 连接到配置的 HBase 集群以 push/query(推送/查询) 数据.

时间序列数据点包括: - a metric name. - a UNIX timestamp (seconds or milliseconds since Epoch). - a value (64 bit integer or single-precision floating point value). - a set of tags (key-value pairs) that describe the time series the point belongs to.

Storm bolt 和 trident state 从一个基于给定的 TupleMetricPointMapper 的 tuple 中创建了上面的时间序列数据.

该模块提供了 core Storm 和 Trident bolt 实现, 用户将数据写入 OpenTSDB.

时间序列数据点被写入时有 at-least-once(至少一次)的语义保证, 并且重复的数据点应该像 OpenTSDB 的 这里 提及的一样来处理.

示例

Core Bolt

下面的示例描述了 org.apache.storm.opentsdb.bolt.OpenTsdbBolt cord bolt 的使用方法

  1. OpenTsdbClient.Builder builder = OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
  2. final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
  3. openTsdbBolt.withBatchSize(10).withFlushInterval(2000);
  4. topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");

Trident State

  1. final OpenTsdbStateFactory openTsdbStateFactory =
  2. new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl),
  3. Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
  4. TridentTopology tridentTopology = new TridentTopology();
  5. final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout());
  6. stream.peek(new Consumer() {
  7. @Override
  8. public void accept(TridentTuple input) {
  9. LOG.info("########### Received tuple: [{}]", input);
  10. }
  11. }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());