Package org.apache.storm
Class Testing
java.lang.Object
org.apache.storm.Testing
A utility that helps with testing topologies, Bolts and Spouts.
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic final class
A topology that has all messages captured and can be read later on.static interface
Simply produces a boolean to see if a specific state is true or false. -
Field Summary
Modifier and TypeFieldDescriptionstatic final int
The default amount of wall time should be spent waiting for specific conditions to happen. -
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionstatic void
advanceClusterTime
(ILocalCluster cluster, Integer secs) Simulated time wait for a cluster.static void
advanceClusterTime
(ILocalCluster cluster, Integer secs, Integer step) Simulated time wait for a cluster.captureTopology
(StormTopology topology) Rewrites a topology so that all the tuples flowing through it are captured.static Map<String,
List<FixedTuple>> completeTopology
(ILocalCluster cluster, StormTopology topology) Run a topology to completion capturing all of the messages that are emitted.static Map<String,
List<FixedTuple>> completeTopology
(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param) Run a topology to completion capturing all of the messages that are emitted.static ILocalCluster
getLocalCluster
(Map<String, Object> clusterConf) Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```static int
Deprecated.static <T> boolean
isEvery
(Collection<T> data, Predicate<T> pred) Convenience method for data.stream.allMatch(pred).static TrackedTopology
mkTrackedTopology
(ILocalCluster cluster, StormTopology topology) Deprecated.useTrackedTopology
directly.multiset
(Collection<T> c) Count how many times each element appears in the Collection.static <T> boolean
multiseteq
(Collection<T> a, Collection<T> b) Check if two collections are equivalent ignoring the order of elements.readTuples
(Map<String, List<FixedTuple>> results, String componentId) Get all of the tuples from a given component on the default stream.readTuples
(Map<String, List<FixedTuple>> results, String componentId, String streamId) Get all of the tuples from a given component on a given stream.static void
simulateWait
(ILocalCluster cluster) If using simulated time simulate waiting for 10 seconds.static Tuple
Create aTuple
for use with testing.static Tuple
testTuple
(List<Object> values, MkTupleParam param) Create aTuple
for use with testing.trackAndCaptureTopology
(ILocalCluster cluster, StormTopology topology) Track and capture a topology.static void
Simulated time wait for a tracked topology.static void
trackedWait
(Testing.CapturedTopology<TrackedTopology> topo, Integer amt) Simulated time wait for a tracked topology.static void
trackedWait
(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs) Simulated time wait for a tracked topology.static void
trackedWait
(TrackedTopology topo) Simulated time wait for a tracked topology.static void
trackedWait
(TrackedTopology topo, Integer amt) Simulated time wait for a tracked topology.static void
trackedWait
(TrackedTopology topo, Integer amt, Integer timeoutMs) Simulated time wait for a tracked topology.static void
whileTimeout
(long timeoutMs, Testing.Condition condition, Runnable body) Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.static void
whileTimeout
(Testing.Condition condition, Runnable body) Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.static void
withLocalCluster
(MkClusterParam param, TestJob code) Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```static void
withLocalCluster
(TestJob code) Deprecated.use ``` try (LocalCluster cluster = new LocalCluster()) { ... } ```static void
withSimulatedTime
(Runnable code) Deprecated.use ``` try (Time.SimulatedTime time = new Time.SimulatedTime()) { ... } ```static void
withSimulatedTimeLocalCluster
(MkClusterParam param, TestJob code) Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { ... } ```static void
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { ... } ```static void
withTrackedCluster
(MkClusterParam param, TestJob code) Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { ... } ```static void
withTrackedCluster
(TestJob code) Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { ... } ```
-
Field Details
-
TEST_TIMEOUT_MS
public static final int TEST_TIMEOUT_MSThe default amount of wall time should be spent waiting for specific conditions to happen. Default is 10 seconds unless the environment variable STORM_TEST_TIMEOUT_MS is set.
-
-
Constructor Details
-
Testing
public Testing()
-
-
Method Details
-
whileTimeout
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.- Parameters:
condition
- what we are waiting forbody
- what to run in the loop- Throws:
AssertionError
- if the loop timed out.
-
whileTimeout
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.- Parameters:
timeoutMs
- the number of ms to wait before timing out.condition
- what we are waiting forbody
- what to run in the loop- Throws:
AssertionError
- if the loop timed out.
-
isEvery
Convenience method for data.stream.allMatch(pred). -
withSimulatedTime
Deprecated.use ``` try (Time.SimulatedTime time = new Time.SimulatedTime()) { ... } ```Run with simulated time.- Parameters:
code
- what to run
-
withLocalCluster
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster()) { ... } ```Run with a local cluster.- Parameters:
code
- what to run
-
withLocalCluster
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```Run with a local cluster.- Parameters:
param
- configs to set in the clustercode
- what to run
-
getLocalCluster
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```Run with a local cluster.- Parameters:
clusterConf
- some configs to set in the cluster
-
withSimulatedTimeLocalCluster
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { ... } ```Run with a local cluster.- Parameters:
code
- what to run
-
withSimulatedTimeLocalCluster
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { ... } ```Run with a local cluster.- Parameters:
param
- configs to set in the clustercode
- what to run
-
withTrackedCluster
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { ... } ```Run with a local cluster.- Parameters:
code
- what to run
-
withTrackedCluster
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { ... } ```Run with a local tracked cluster.- Parameters:
param
- configs to set in the clustercode
- what to run
-
globalAmt
Deprecated.In a tracked topology some metrics are tracked. This provides a way to get those metrics. This is intended mostly for internal testing.- Parameters:
id
- the id of the tracked clusterkey
- the name of the metric to get.- Returns:
- the metric
-
trackAndCaptureTopology
public static Testing.CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology) Track and capture a topology. This is intended mostly for internal testing. -
captureTopology
Rewrites a topology so that all the tuples flowing through it are captured.- Parameters:
topology
- the topology to rewrite- Returns:
- the modified topology and a new Bolt that can retrieve the captured tuples.
-
completeTopology
public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, org.apache.storm.thrift.TException Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances ofCompletableSpout
.- Parameters:
cluster
- the cluster to submit the topology totopology
- the topology itself- Returns:
- a map of the component to the list of tuples it emitted
- Throws:
org.apache.storm.thrift.TException
- on any error from nimbusInterruptedException
-
completeTopology
public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param) throws org.apache.storm.thrift.TException, InterruptedException Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances ofCompletableSpout
or are overwritten by MockedSources in param- Parameters:
cluster
- the cluster to submit the topology totopology
- the topology itselfparam
- parameters to describe how to complete a topology- Returns:
- a map of the component to the list of tuples it emitted
- Throws:
org.apache.storm.thrift.TException
- on any error from nimbus.InterruptedException
-
simulateWait
If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.- Throws:
InterruptedException
-
readTuples
public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId) Get all of the tuples from a given component on the default stream.- Parameters:
results
- the results of running a completed topologycomponentId
- the id of the component to look at- Returns:
- a list of the tuple values.
-
readTuples
public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId, String streamId) Get all of the tuples from a given component on a given stream.- Parameters:
results
- the results of running a completed topologycomponentId
- the id of the component to look atstreamId
- the id of the stream to look for.- Returns:
- a list of the tuple values.
-
mkTrackedTopology
@Deprecated public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology) Deprecated.useTrackedTopology
directly.Create a tracked topology. -
trackedWait
Simulated time wait for a tracked topology. This is intended for internal testing. -
trackedWait
Simulated time wait for a tracked topology. This is intended for internal testing. -
trackedWait
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs) Simulated time wait for a tracked topology. This is intended for internal testing. -
trackedWait
Simulated time wait for a tracked topology. This is intended for internal testing. -
trackedWait
Simulated time wait for a tracked topology. This is intended for internal testing. -
trackedWait
Simulated time wait for a tracked topology. This is intended for internal testing. -
advanceClusterTime
public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException Simulated time wait for a cluster. This is intended for internal testing.- Throws:
InterruptedException
-
advanceClusterTime
public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException Simulated time wait for a cluster. This is intended for internal testing.- Throws:
InterruptedException
-
multiset
Count how many times each element appears in the Collection.- Parameters:
c
- a collection of values- Returns:
- a map of the unique values in c to the count of those values.
-
multiseteq
Check if two collections are equivalent ignoring the order of elements. -
testTuple
Create aTuple
for use with testing.- Parameters:
values
- the values to appear in the tuple
-
testTuple
Create aTuple
for use with testing.- Parameters:
values
- the values to appear in the tupleparam
- parametrs describing more details about the tuple
-