Version: 2.2.0

Storm OpenTSDB Bolt and TridentState

OpenTSDB offers a scalable and highly available storage for time series data. It consists of a Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the configured HBase cluster to push/query the data.

Time series data point consists of: - a metric name. - a UNIX timestamp (seconds or milliseconds since Epoch). - a value (64 bit integer or single-precision floating point value). - a set of tags (key-value pairs) that describe the time series the point belongs to.

Storm bolt and trident state creates the above time series data from a tuple based on the given TupleMetricPointMapper

This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB.

Time series data points are written with at-least-once guarantee and duplicate data points should be handled as mentioned here in OpenTSDB.

Examples

Core Bolt

Below example describes the usage of core bolt which is org.apache.storm.opentsdb.bolt.OpenTsdbBolt


        OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
        final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
        openTsdbBolt.withBatchSize(10).withFlushInterval(2000);
        topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");

Trident State


        final OpenTsdbStateFactory openTsdbStateFactory =
                new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl),
                        Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
        TridentTopology tridentTopology = new TridentTopology();

        final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout());

        stream.peek(new Consumer() {
            @Override
            public void accept(TridentTuple input) {
                LOG.info("########### Received tuple: [{}]", input);
            }
        }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater());