public class Utils extends Object
Modifier and Type | Class and Description |
---|---|
static class |
Utils.SmartThread
A thread that can answer if it is sleeping in the case of simulated time.
|
static class |
Utils.UptimeComputer |
Modifier and Type | Field and Description |
---|---|
static Pattern |
BLOB_KEY_PATTERN |
static String |
DEFAULT_STREAM_ID |
static org.slf4j.Logger |
LOG |
Constructor and Description |
---|
Utils() |
Modifier and Type | Method and Description |
---|---|
static void |
addShutdownHookWithDelayedForceKill(Runnable func,
int numSecs)
Adds the user supplied function as a shutdown hook for cleanup.
|
static void |
addShutdownHookWithForceKillIn1Sec(Runnable func)
Adds the user supplied function as a shutdown hook for cleanup.
|
static StormTopology |
addVersions(StormTopology topology)
Add version information to the given topology.
|
static Utils.SmartThread |
asyncLoop(Callable afn)
Convenience method used when only the function is given.
|
static Utils.SmartThread |
asyncLoop(Callable afn,
boolean isDaemon,
Thread.UncaughtExceptionHandler eh,
int priority,
boolean isFactory,
boolean startImmediately,
String threadName)
Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.
|
static Utils.SmartThread |
asyncLoop(Callable afn,
String threadName,
Thread.UncaughtExceptionHandler eh)
Convenience method used when only the function and name suffix are given.
|
static long |
bitXor(Long a,
Long b) |
static long |
bitXorVals(List<Long> coll) |
static boolean |
checkDirExists(String dir) |
static boolean |
checkFileExists(String path) |
static <V> ArrayList<V> |
convertToArray(Map<Integer,V> srcMap,
int start) |
static Thread.UncaughtExceptionHandler |
createDefaultUncaughtExceptionHandler() |
static Thread.UncaughtExceptionHandler |
createWorkerUncaughtExceptionHandler() |
static <T> T |
deserialize(byte[] serialized,
Class<T> clazz) |
static <T> T |
deserializeFromString(String str,
Class<T> clazz)
Deserialize an object stored in a string.
|
static boolean |
exceptionCauseIsInstanceOf(Class klass,
Throwable throwable)
Checks if a throwable is an instance of a particular class.
|
static void |
exitProcess(int val,
String msg) |
static Map<String,Object> |
findAndReadConfigFile(String name) |
static Map<String,Object> |
findAndReadConfigFile(String name,
boolean mustExist) |
static List<List<String>> |
findComponentCycles(StormTopology topology,
String topoId)
Find and return components cycles in the topology graph when starting from spout.
|
static <T> T |
findOne(IPredicate<T> pred,
Collection<T> coll)
Find the first item of coll for which pred.test(…) returns true.
|
static <T,U> T |
findOne(IPredicate<T> pred,
Map<U,T> map) |
static List<URL> |
findResources(String name) |
static void |
forceDelete(String path)
Deletes a file or directory and its contents if it exists.
|
protected void |
forceDeleteImpl(String path) |
static Map<String,Object> |
fromCompressedJsonConf(byte[] serialized) |
static <S,T> T |
get(Map<S,T> m,
S key,
T def) |
static NavigableMap<String,IVersionInfo> |
getAlternativeVersionsMap(Map<String,Object> conf)
Get a mapping of the configured supported versions of storm to their actual versions.
|
static int |
getAvailablePort()
Shortcut to calling
getAvailablePort(int) with 0 as the preferred port. |
static int |
getAvailablePort(int preferredPort)
Gets an available port.
|
static ClientBlobStore |
getClientBlobStore(Map<String,Object> conf) |
static <T> T |
getCompatibleVersion(NavigableMap<SimpleVersion,T> versionedMap,
SimpleVersion desiredVersion,
String what,
T defaultValue) |
static ComponentCommon |
getComponentCommon(StormTopology topology,
String id) |
static Map<String,Object> |
getConfigFromClasspath(List<String> cp,
Map<String,Object> conf) |
static Object |
getConfiguredClass(Map<String,Object> conf,
Object configKey)
Return a new instance of a pluggable specified in the conf.
|
static NavigableMap<SimpleVersion,List<String>> |
getConfiguredClasspathVersions(Map<String,Object> conf,
List<String> currentClassPath)
Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP
|
static NavigableMap<SimpleVersion,String> |
getConfiguredWorkerLogWriterVersions(Map<String,Object> conf)
Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP
|
static NavigableMap<SimpleVersion,String> |
getConfiguredWorkerMainVersions(Map<String,Object> conf)
Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP
|
static GlobalStreamId |
getGlobalStreamId(String componentId,
String streamId) |
static List<String> |
getRepeat(List<String> list) |
static Object |
getSetComponentObject(ComponentObject obj) |
static org.apache.storm.shade.org.apache.zookeeper.data.ACL |
getSuperUserAcl(Map<String,Object> conf)
Get the ACL for nimbus/supervisor.
|
static String |
getTopologyId(String name,
Nimbus.Iface client) |
static TopologyInfo |
getTopologyInfo(String name,
String asUser,
Map<String,Object> topoConf) |
static List<org.apache.storm.shade.org.apache.zookeeper.data.ACL> |
getWorkerACL(Map<String,Object> conf)
Get the ZK ACLs that a worker should use when writing to ZK.
|
static byte[] |
gunzip(byte[] data) |
static byte[] |
gzip(byte[] data) |
static void |
handleUncaughtException(Throwable t) |
static void |
handleUncaughtException(Throwable t,
Set<Class<?>> allowedExceptions,
boolean worker)
Handles uncaught exceptions.
|
static void |
handleWorkerUncaughtException(Throwable t) |
static String |
hostname()
Gets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.
|
protected String |
hostnameImpl() |
static TreeMap<Integer,Integer> |
integerDivided(int sum,
int numPieces) |
static boolean |
isLocalhostAddress(String address) |
static boolean |
isOnWindows() |
static boolean |
isSystemId(String id) |
static boolean |
isValidConf(Map<String,Object> topoConfIn) |
static boolean |
isValidKey(String key)
Validates blob key.
|
static boolean |
isZkAuthenticationConfiguredStormServer(Map<String,Object> conf)
Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.
|
static boolean |
isZkAuthenticationConfiguredTopology(Map<String,Object> conf)
Is the topology configured to have ZooKeeper authentication.
|
static <T> T |
javaDeserialize(byte[] serialized,
Class<T> clazz) |
static byte[] |
javaSerialize(Object obj) |
static <T> String |
join(Iterable<T> coll,
String sep) |
static String |
localHostname() |
protected String |
localHostnameImpl() |
static Utils.UptimeComputer |
makeUptimeComputer() |
Utils.UptimeComputer |
makeUptimeComputerImpl() |
static String |
memoizedLocalHostname() |
static <K,V> Map<K,V> |
merge(Map<? extends K,? extends V> first,
Map<? extends K,? extends V> other) |
static Runnable |
mkSuicideFn() |
static double |
nullToZero(Double v) |
static <V> V |
OR(V a,
V b)
a or b the first one that is not null.
|
static Map<String,Object> |
parseJson(String json) |
static Double |
parseJvmHeapMemByChildOpts(List<String> options,
Double defaultValue)
parses the arguments to extract jvm heap memory size in MB.
|
static org.apache.storm.shade.org.apache.zookeeper.data.Id |
parseZkId(String id,
String configName) |
static <T> List<List<T>> |
partitionFixed(int maxNumChunks,
Collection<T> coll)
Fills up chunks out of a collection (given a maximum amount of chunks).
|
static String |
processPid()
Get process PID.
|
static void |
readAndLogStream(String prefix,
InputStream in) |
static Map<String,Object> |
readCommandLineOpts() |
static Map<String,Object> |
readDefaultConfig() |
static Map<String,Object> |
readStormConfig() |
static Object |
readYamlFile(String yamlFile) |
static Map<String,Object> |
redactValue(Map<String,Object> m,
String key)
Creates a new map with a string value in the map replaced with an equivalently-lengthed string of ‘#’.
|
static void |
resetClassLoaderForJavaDeSerialize() |
static Map<Object,List<Object>> |
reverseMap(List<List<Object>> listSeq)
“[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}” Reverses an assoc-list style Map like reverseMap(Map…)
|
static <K,V> HashMap<V,List<K>> |
reverseMap(Map<K,V> map)
“{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}” . |
static long |
secureRandomLong() |
static byte[] |
serialize(Object obj) |
static String |
serializeToString(Object obj)
Serialize an object using the configured serialization and then base64 encode it into a string.
|
static void |
setClassLoaderForJavaDeSerialize(ClassLoader cl) |
static Utils |
setInstance(Utils u)
Provide an instance of this class for delegates to use.
|
static void |
setupDefaultUncaughtExceptionHandler() |
static void |
setupWorkerUncaughtExceptionHandler() |
static void |
sleep(long millis) |
static void |
sleepNoSimulation(long millis) |
static String |
threadDump()
Gets some information, including stack trace, for a running thread.
|
static <T> T |
thriftDeserialize(Class<T> c,
byte[] b) |
static <T> T |
thriftDeserialize(Class<T> c,
byte[] b,
int offset,
int length) |
static byte[] |
thriftSerialize(org.apache.storm.thrift.TBase t) |
static byte[] |
toByteArray(ByteBuffer buffer) |
static byte[] |
toCompressedJsonConf(Map<String,Object> topoConf) |
static int |
toPositive(int number)
A cheap way to deterministically convert a number to a positive value.
|
static List<Object> |
tuple(Object... values) |
static <T extends Throwable> |
unwrapAndThrow(Class<T> klass,
Throwable t) |
static <T extends Throwable> |
unwrapTo(Class<T> klass,
Throwable t) |
static String |
urlDecodeUtf8(String s)
URL decode the given string using the UTF-8 charset.
|
static String |
urlEncodeUtf8(String s)
URL encode the given string using the UTF-8 charset.
|
static String |
uuid() |
static void |
validateCycleFree(StormTopology topology,
String name)
Validate that the topology is cycle free.
|
static void |
validateTopologyBlobStoreMap(Map<String,Object> topoConf)
Validate topology blobstore map.
|
static void |
validateTopologyBlobStoreMap(Map<String,Object> topoConf,
BlobStore blobStore)
Validate topology blobstore map.
|
static void |
validateTopologyBlobStoreMap(Map<String,Object> topoConf,
NimbusBlobStore client)
Validate topology blobstore map.
|
static void |
validateTopologyName(String name)
Validates topology name.
|
static RuntimeException |
wrapInRuntime(Exception e) |
static double |
zeroIfNaNOrInf(double x) |
public static final org.slf4j.Logger LOG
public static final String DEFAULT_STREAM_ID
public static final Pattern BLOB_KEY_PATTERN
public static Utils setInstance(Utils u)
Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.
u
- a Utils instancepublic static void setClassLoaderForJavaDeSerialize(ClassLoader cl)
public static void resetClassLoaderForJavaDeSerialize()
public static Map<String,Object> findAndReadConfigFile(String name, boolean mustExist)
public static String urlEncodeUtf8(String s)
URL encode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLEncoder.encode(String, Charset) instead, which obsoletes this method.
public static String urlDecodeUtf8(String s)
URL decode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLDecoder.decode(String, Charset) instead, which obsoletes this method.
public static void addShutdownHookWithForceKillIn1Sec(Runnable func)
Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for a second and then halts the runtime to avoid any zombie process in case cleanup function hangs.
public static void addShutdownHookWithDelayedForceKill(Runnable func, int numSecs)
Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for numSecs and then halts the runtime to avoid any zombie process in case cleanup function hangs.
public static boolean isSystemId(String id)
public static Utils.SmartThread asyncLoop(Callable afn, boolean isDaemon, Thread.UncaughtExceptionHandler eh, int priority, boolean isFactory, boolean startImmediately, String threadName)
Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.
The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable that in turn returns the number of seconds to sleep. In the latter case isFactory.
afn
- the code to call on each iterationisDaemon
- whether the new thread should be a daemon threadeh
- code to call when afn throws an exceptionpriority
- the new thread’s priorityisFactory
- whether afn returns a callable instead of sleep secondsstartImmediately
- whether to start the thread before returningthreadName
- a suffix to be appended to the thread nameThread
public static Utils.SmartThread asyncLoop(Callable afn, String threadName, Thread.UncaughtExceptionHandler eh)
Convenience method used when only the function and name suffix are given.
afn
- the code to call on each iterationthreadName
- a suffix to be appended to the thread nameThread
public static Utils.SmartThread asyncLoop(Callable afn)
Convenience method used when only the function is given.
afn
- the code to call on each iterationpublic static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable)
Checks if a throwable is an instance of a particular class.
klass
- The class you’re expectingthrowable
- The throwable you expect to be an instance of klasspublic static <T extends Throwable> void unwrapAndThrow(Class<T> klass, Throwable t) throws T extends Throwable
T extends Throwable
public static RuntimeException wrapInRuntime(Exception e)
public static long secureRandomLong()
public static String hostname() throws UnknownHostException
Gets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.
UnknownHostException
public static String localHostname() throws UnknownHostException
UnknownHostException
public static void exitProcess(int val, String msg)
public static String uuid()
public static byte[] javaSerialize(Object obj)
public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz)
public static <S,T> T get(Map<S,T> m, S key, T def)
public static double zeroIfNaNOrInf(double x)
public static org.apache.storm.shade.org.apache.zookeeper.data.Id parseZkId(String id, String configName)
public static org.apache.storm.shade.org.apache.zookeeper.data.ACL getSuperUserAcl(Map<String,Object> conf)
Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled.
conf
- the config to get the super User ACL frompublic static List<org.apache.storm.shade.org.apache.zookeeper.data.ACL> getWorkerACL(Map<String,Object> conf)
Get the ZK ACLs that a worker should use when writing to ZK.
conf
- the config for the topology.public static boolean isZkAuthenticationConfiguredTopology(Map<String,Object> conf)
Is the topology configured to have ZooKeeper authentication.
conf
- the topology configurationpublic static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker)
Handles uncaught exceptions.
worker
- true if this is for handling worker exceptionspublic static void handleUncaughtException(Throwable t)
public static void handleWorkerUncaughtException(Throwable t)
public static byte[] thriftSerialize(org.apache.storm.thrift.TBase t)
public static <T> T thriftDeserialize(Class<T> c, byte[] b)
public static <T> T thriftDeserialize(Class<T> c, byte[] b, int offset, int length)
public static void sleepNoSimulation(long millis)
public static void sleep(long millis)
public static Utils.UptimeComputer makeUptimeComputer()
public static <K,V> HashMap<V,List<K>> reverseMap(Map<K,V> map)
“{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}”
.
Example usage in java: Map<Integer, String> tasks; Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
The order of he resulting list values depends on the ordering properties of the Map passed in. The caller is responsible for passing an ordered map if they expect the result to be consistently ordered as well.
map
- to reversepublic static Map<Object,List<Object>> reverseMap(List<List<Object>> listSeq)
“[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}” Reverses an assoc-list style Map like reverseMap(Map…)
listSeq
- to reversepublic static boolean isOnWindows()
public static boolean checkFileExists(String path)
public static void forceDelete(String path) throws IOException
Deletes a file or directory and its contents if it exists. Does not complain if the input is null or does not exist.
path
- the path to the file or directoryIOException
public static byte[] serialize(Object obj)
public static <T> T deserialize(byte[] serialized, Class<T> clazz)
public static String serializeToString(Object obj)
Serialize an object using the configured serialization and then base64 encode it into a string.
obj
- the object to encodepublic static <T> T deserializeFromString(String str, Class<T> clazz)
Deserialize an object stored in a string. The String is assumed to be a base64 encoded string containing the bytes to actually deserialize.
str
- the encoded string.clazz
- the thrift class we are expecting.T
- The type of clazzpublic static byte[] toByteArray(ByteBuffer buffer)
public static Runnable mkSuicideFn()
public static void readAndLogStream(String prefix, InputStream in)
public static ComponentCommon getComponentCommon(StormTopology topology, String id)
public static byte[] gzip(byte[] data)
public static byte[] gunzip(byte[] data)
public static GlobalStreamId getGlobalStreamId(String componentId, String streamId)
public static Object getSetComponentObject(ComponentObject obj)
public static int toPositive(int number)
A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. When the input number is negative, the returned positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) which is not its absolutely value.
number
- a given numberpublic static String processPid()
Get process PID.
public static Map<String,Object> redactValue(Map<String,Object> m, String key)
Creates a new map with a string value in the map replaced with an equivalently-lengthed string of ‘#’. (If the object is not a string to string will be called on it and replaced)
m
- The map that a value will be redacted fromkey
- The key pointing to the value to be redactedpublic static Thread.UncaughtExceptionHandler createDefaultUncaughtExceptionHandler()
public static Thread.UncaughtExceptionHandler createWorkerUncaughtExceptionHandler()
public static void setupDefaultUncaughtExceptionHandler()
public static void setupWorkerUncaughtExceptionHandler()
public static Double parseJvmHeapMemByChildOpts(List<String> options, Double defaultValue)
parses the arguments to extract jvm heap memory size in MB.
public static ClientBlobStore getClientBlobStore(Map<String,Object> conf)
public static TopologyInfo getTopologyInfo(String name, String asUser, Map<String,Object> topoConf)
public static String getTopologyId(String name, Nimbus.Iface client)
public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf) throws InvalidTopologyException, AuthorizationException
Validate topology blobstore map.
topoConf
- Topology configurationInvalidTopologyException
AuthorizationException
public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf, NimbusBlobStore client) throws InvalidTopologyException, AuthorizationException
Validate topology blobstore map.
topoConf
- Topology configurationclient
- The NimbusBlobStore client. It must call prepare() before being used here.InvalidTopologyException
AuthorizationException
public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf, BlobStore blobStore) throws InvalidTopologyException, AuthorizationException
Validate topology blobstore map.
public static String threadDump()
Gets some information, including stack trace, for a running thread.
public static boolean checkDirExists(String dir)
public static Object getConfiguredClass(Map<String,Object> conf, Object configKey)
Return a new instance of a pluggable specified in the conf.
conf
- The conf to read from.configKey
- The key pointing to the pluggable classpublic static boolean isZkAuthenticationConfiguredStormServer(Map<String,Object> conf)
Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.
conf
- the storm configuration, not the topology configurationpublic static double nullToZero(Double v)
public static <V> V OR(V a, V b)
a or b the first one that is not null.
a
- somethingb
- something elsepublic static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll)
Fills up chunks out of a collection (given a maximum amount of chunks).
i.e. partitionFixed(5, [1,2,3]) -> 1,2,3 partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]] partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
maxNumChunks
- the maximum number of chunks to returncoll
- the collection to be chunked uppublic static int getAvailablePort(int preferredPort)
Gets an available port. Consider if it is possible to pass port 0 to the server instead of using this method, since there is no guarantee that the port returned by this method will remain free.
public static int getAvailablePort()
Shortcut to calling getAvailablePort(int)
with 0 as the preferred port.
public static <T> T findOne(IPredicate<T> pred, Collection<T> coll)
Find the first item of coll for which pred.test(…) returns true.
pred
- The IPredicate to test forcoll
- The Collection of items to search through.public static <T,U> T findOne(IPredicate<T> pred, Map<U,T> map)
public static String memoizedLocalHostname() throws UnknownHostException
UnknownHostException
public static StormTopology addVersions(StormTopology topology)
Add version information to the given topology.
topology
- the topology being submitted (MIGHT BE MODIFIED)public static NavigableMap<SimpleVersion,List<String>> getConfiguredClasspathVersions(Map<String,Object> conf, List<String> currentClassPath)
Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP
conf
- what to read it out ofcurrentClassPath
- the current classpath for this version of storm (not included in the conf, but returned by this)public static NavigableMap<String,IVersionInfo> getAlternativeVersionsMap(Map<String,Object> conf)
Get a mapping of the configured supported versions of storm to their actual versions.
conf
- what to read the configuration out of.public static NavigableMap<SimpleVersion,String> getConfiguredWorkerMainVersions(Map<String,Object> conf)
Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP
conf
- what to read it out ofpublic static NavigableMap<SimpleVersion,String> getConfiguredWorkerLogWriterVersions(Map<String,Object> conf)
Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP
conf
- what to read it out ofpublic static <T> T getCompatibleVersion(NavigableMap<SimpleVersion,T> versionedMap, SimpleVersion desiredVersion, String what, T defaultValue)
public static Map<String,Object> getConfigFromClasspath(List<String> cp, Map<String,Object> conf) throws IOException
IOException
public static boolean isLocalhostAddress(String address)
public static <K,V> Map<K,V> merge(Map<? extends K,? extends V> first, Map<? extends K,? extends V> other)
protected void forceDeleteImpl(String path) throws IOException
IOException
public Utils.UptimeComputer makeUptimeComputerImpl()
protected String localHostnameImpl() throws UnknownHostException
UnknownHostException
protected String hostnameImpl() throws UnknownHostException
UnknownHostException
public static boolean isValidKey(String key)
Validates blob key.
key
- Key for the blob.public static void validateTopologyName(String name) throws IllegalArgumentException
Validates topology name.
name
- the topology nameIllegalArgumentException
- if the topology name is not validpublic static List<List<String>> findComponentCycles(StormTopology topology, String topoId)
Find and return components cycles in the topology graph when starting from spout. Return a list of cycles. Each cycle may consist of one or more components. Components that cannot be reached from any of the spouts are ignored.
public static void validateCycleFree(StormTopology topology, String name) throws InvalidTopologyException
Validate that the topology is cycle free. If not, then throw an InvalidTopologyException describing the cycle(s).
topology
- StormTopology instance to examine.name
- Name of the topology, used in exception error message.InvalidTopologyException
- if there are cycles, with message describing the cycles encountered.Copyright © 2022 The Apache Software Foundation. All rights reserved.