Class Testing

java.lang.Object
org.apache.storm.Testing

public class Testing extends Object
A utility that helps with testing topologies, Bolts and Spouts.
  • Field Details

    • TEST_TIMEOUT_MS

      public static final int TEST_TIMEOUT_MS
      The default amount of wall time should be spent waiting for specific conditions to happen. Default is 10 seconds unless the environment variable STORM_TEST_TIMEOUT_MS is set.
  • Constructor Details

    • Testing

      public Testing()
  • Method Details

    • whileTimeout

      public static void whileTimeout(Testing.Condition condition, Runnable body)
      Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.
      Parameters:
      condition - what we are waiting for
      body - what to run in the loop
      Throws:
      AssertionError - if the loop timed out.
    • whileTimeout

      public static void whileTimeout(long timeoutMs, Testing.Condition condition, Runnable body)
      Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.
      Parameters:
      timeoutMs - the number of ms to wait before timing out.
      condition - what we are waiting for
      body - what to run in the loop
      Throws:
      AssertionError - if the loop timed out.
    • isEvery

      public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred)
      Convenience method for data.stream.allMatch(pred).
    • withSimulatedTime

      @Deprecated public static void withSimulatedTime(Runnable code)
      Deprecated.
      use ``` try (Time.SimulatedTime time = new Time.SimulatedTime()) { ... } ```
      Run with simulated time.
      Parameters:
      code - what to run
    • withLocalCluster

      @Deprecated public static void withLocalCluster(TestJob code)
      Deprecated.
      use ``` try (LocalCluster cluster = new LocalCluster()) { ... } ```
      Run with a local cluster.
      Parameters:
      code - what to run
    • withLocalCluster

      @Deprecated public static void withLocalCluster(MkClusterParam param, TestJob code)
      Deprecated.
      use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```
      Run with a local cluster.
      Parameters:
      param - configs to set in the cluster
      code - what to run
    • getLocalCluster

      @Deprecated public static ILocalCluster getLocalCluster(Map<String,Object> clusterConf)
      Deprecated.
      use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```
      Run with a local cluster.
      Parameters:
      clusterConf - some configs to set in the cluster
    • withSimulatedTimeLocalCluster

      @Deprecated public static void withSimulatedTimeLocalCluster(TestJob code)
      Deprecated.
      use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { ... } ```
      Run with a local cluster.
      Parameters:
      code - what to run
    • withSimulatedTimeLocalCluster

      @Deprecated public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code)
      Deprecated.
      use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { ... } ```
      Run with a local cluster.
      Parameters:
      param - configs to set in the cluster
      code - what to run
    • withTrackedCluster

      @Deprecated public static void withTrackedCluster(TestJob code)
      Deprecated.
      use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { ... } ```
      Run with a local cluster.
      Parameters:
      code - what to run
    • withTrackedCluster

      @Deprecated public static void withTrackedCluster(MkClusterParam param, TestJob code)
      Deprecated.
      use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { ... } ```
      Run with a local tracked cluster.
      Parameters:
      param - configs to set in the cluster
      code - what to run
    • globalAmt

      @Deprecated public static int globalAmt(String id, String key)
      Deprecated.
      In a tracked topology some metrics are tracked. This provides a way to get those metrics. This is intended mostly for internal testing.
      Parameters:
      id - the id of the tracked cluster
      key - the name of the metric to get.
      Returns:
      the metric
    • trackAndCaptureTopology

      public static Testing.CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology)
      Track and capture a topology. This is intended mostly for internal testing.
    • captureTopology

      public static Testing.CapturedTopology<StormTopology> captureTopology(StormTopology topology)
      Rewrites a topology so that all the tuples flowing through it are captured.
      Parameters:
      topology - the topology to rewrite
      Returns:
      the modified topology and a new Bolt that can retrieve the captured tuples.
    • completeTopology

      public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, org.apache.storm.thrift.TException
      Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances of CompletableSpout.
      Parameters:
      cluster - the cluster to submit the topology to
      topology - the topology itself
      Returns:
      a map of the component to the list of tuples it emitted
      Throws:
      org.apache.storm.thrift.TException - on any error from nimbus
      InterruptedException
    • completeTopology

      public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param) throws org.apache.storm.thrift.TException, InterruptedException
      Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances of CompletableSpout or are overwritten by MockedSources in param
      Parameters:
      cluster - the cluster to submit the topology to
      topology - the topology itself
      param - parameters to describe how to complete a topology
      Returns:
      a map of the component to the list of tuples it emitted
      Throws:
      org.apache.storm.thrift.TException - on any error from nimbus.
      InterruptedException
    • simulateWait

      public static void simulateWait(ILocalCluster cluster) throws InterruptedException
      If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.
      Throws:
      InterruptedException
    • readTuples

      public static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId)
      Get all of the tuples from a given component on the default stream.
      Parameters:
      results - the results of running a completed topology
      componentId - the id of the component to look at
      Returns:
      a list of the tuple values.
    • readTuples

      public static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId, String streamId)
      Get all of the tuples from a given component on a given stream.
      Parameters:
      results - the results of running a completed topology
      componentId - the id of the component to look at
      streamId - the id of the stream to look for.
      Returns:
      a list of the tuple values.
    • mkTrackedTopology

      @Deprecated public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology)
      Deprecated.
      use TrackedTopology directly.
      Create a tracked topology.
    • trackedWait

      public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo)
      Simulated time wait for a tracked topology. This is intended for internal testing.
    • trackedWait

      public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt)
      Simulated time wait for a tracked topology. This is intended for internal testing.
    • trackedWait

      public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs)
      Simulated time wait for a tracked topology. This is intended for internal testing.
    • trackedWait

      public static void trackedWait(TrackedTopology topo)
      Simulated time wait for a tracked topology. This is intended for internal testing.
    • trackedWait

      public static void trackedWait(TrackedTopology topo, Integer amt)
      Simulated time wait for a tracked topology. This is intended for internal testing.
    • trackedWait

      public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs)
      Simulated time wait for a tracked topology. This is intended for internal testing.
    • advanceClusterTime

      public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException
      Simulated time wait for a cluster. This is intended for internal testing.
      Throws:
      InterruptedException
    • advanceClusterTime

      public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException
      Simulated time wait for a cluster. This is intended for internal testing.
      Throws:
      InterruptedException
    • multiset

      public static <T> Map<T,Integer> multiset(Collection<T> c)
      Count how many times each element appears in the Collection.
      Parameters:
      c - a collection of values
      Returns:
      a map of the unique values in c to the count of those values.
    • multiseteq

      public static <T> boolean multiseteq(Collection<T> a, Collection<T> b)
      Check if two collections are equivalent ignoring the order of elements.
    • testTuple

      public static Tuple testTuple(List<Object> values)
      Create a Tuple for use with testing.
      Parameters:
      values - the values to appear in the tuple
    • testTuple

      public static Tuple testTuple(List<Object> values, MkTupleParam param)
      Create a Tuple for use with testing.
      Parameters:
      values - the values to appear in the tuple
      param - parametrs describing more details about the tuple