Class Utils

java.lang.Object
org.apache.storm.utils.Utils

public class Utils extends Object
  • Field Details

    • LOG

      public static final org.slf4j.Logger LOG
    • DEFAULT_STREAM_ID

      public static final String DEFAULT_STREAM_ID
      See Also:
    • BLOB_KEY_PATTERN

      public static final Pattern BLOB_KEY_PATTERN
  • Constructor Details

    • Utils

      public Utils()
  • Method Details

    • setInstance

      public static Utils setInstance(Utils u)
      Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.
      Parameters:
      u - a Utils instance
      Returns:
      the previously set instance
    • setClassLoaderForJavaDeSerialize

      public static void setClassLoaderForJavaDeSerialize(ClassLoader cl)
    • resetClassLoaderForJavaDeSerialize

      public static void resetClassLoaderForJavaDeSerialize()
    • findResources

      public static List<URL> findResources(String name)
    • findAndReadConfigFile

      public static Map<String,Object> findAndReadConfigFile(String name, boolean mustExist)
    • findAndReadConfigFile

      public static Map<String,Object> findAndReadConfigFile(String name)
    • readDefaultConfig

      public static Map<String,Object> readDefaultConfig()
    • urlEncodeUtf8

      public static String urlEncodeUtf8(String s)
      URL encode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLEncoder.encode(String, Charset) instead, which obsoletes this method.
    • urlDecodeUtf8

      public static String urlDecodeUtf8(String s)
      URL decode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLDecoder.decode(String, Charset) instead, which obsoletes this method.
    • readCommandLineOpts

      public static Map<String,Object> readCommandLineOpts()
    • readStormConfig

      public static Map<String,Object> readStormConfig()
    • bitXorVals

      public static long bitXorVals(List<Long> coll)
    • bitXor

      public static long bitXor(Long a, Long b)
    • addShutdownHookWithForceKillIn1Sec

      public static void addShutdownHookWithForceKillIn1Sec(Runnable func)
      Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for a second and then halts the runtime to avoid any zombie process in case cleanup function hangs.
    • addShutdownHookWithDelayedForceKill

      public static void addShutdownHookWithDelayedForceKill(Runnable func, int numSecs)
      Adds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for numSecs and then halts the runtime to avoid any zombie process in case cleanup function hangs.
    • isSystemId

      public static boolean isSystemId(String id)
    • asyncLoop

      public static Utils.SmartThread asyncLoop(Callable afn, boolean isDaemon, Thread.UncaughtExceptionHandler eh, int priority, boolean isFactory, boolean startImmediately, String threadName)
      Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.

      The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable that in turn returns the number of seconds to sleep. In the latter case isFactory.

      Parameters:
      afn - the code to call on each iteration
      isDaemon - whether the new thread should be a daemon thread
      eh - code to call when afn throws an exception
      priority - the new thread's priority
      isFactory - whether afn returns a callable instead of sleep seconds
      startImmediately - whether to start the thread before returning
      threadName - a suffix to be appended to the thread name
      Returns:
      the newly created thread
      See Also:
    • asyncLoop

      public static Utils.SmartThread asyncLoop(Callable afn, String threadName, Thread.UncaughtExceptionHandler eh)
      Convenience method used when only the function and name suffix are given.
      Parameters:
      afn - the code to call on each iteration
      threadName - a suffix to be appended to the thread name
      Returns:
      the newly created thread
      See Also:
    • asyncLoop

      public static Utils.SmartThread asyncLoop(Callable afn)
      Convenience method used when only the function is given.
      Parameters:
      afn - the code to call on each iteration
      Returns:
      the newly created thread
    • exceptionCauseIsInstanceOf

      public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable)
      Checks if a throwable is an instance of a particular class.
      Parameters:
      klass - The class you're expecting
      throwable - The throwable you expect to be an instance of klass
      Returns:
      true if throwable is instance of klass, false otherwise.
    • unwrapTo

      public static <T extends Throwable> T unwrapTo(Class<T> klass, Throwable t)
    • unwrapAndThrow

      public static <T extends Throwable> void unwrapAndThrow(Class<T> klass, Throwable t) throws T
      Throws:
      T extends Throwable
    • wrapInRuntime

      public static RuntimeException wrapInRuntime(Exception e)
    • secureRandomLong

      public static long secureRandomLong()
    • hostname

      public static String hostname() throws UnknownHostException
      Gets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.
      Returns:
      a string representation of the hostname.
      Throws:
      UnknownHostException
    • localHostname

      public static String localHostname() throws UnknownHostException
      Throws:
      UnknownHostException
    • exitProcess

      public static void exitProcess(int val, String msg)
    • uuid

      public static String uuid()
    • javaSerialize

      public static byte[] javaSerialize(Object obj)
    • javaDeserialize

      public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz)
    • get

      public static <S, T> T get(Map<S,T> m, S key, T def)
    • zeroIfNaNOrInf

      public static double zeroIfNaNOrInf(double x)
    • join

      public static <T> String join(Iterable<T> coll, String sep)
    • parseZkId

      public static org.apache.storm.shade.org.apache.zookeeper.data.Id parseZkId(String id, String configName)
    • getSuperUserAcl

      public static org.apache.storm.shade.org.apache.zookeeper.data.ACL getSuperUserAcl(Map<String,Object> conf)
      Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled.
      Parameters:
      conf - the config to get the super User ACL from
      Returns:
      the super user ACL.
    • getWorkerACL

      public static List<org.apache.storm.shade.org.apache.zookeeper.data.ACL> getWorkerACL(Map<String,Object> conf)
      Get the ZK ACLs that a worker should use when writing to ZK.
      Parameters:
      conf - the config for the topology.
      Returns:
      the ACLs
    • isZkAuthenticationConfiguredTopology

      public static boolean isZkAuthenticationConfiguredTopology(Map<String,Object> conf)
      Is the topology configured to have ZooKeeper authentication.
      Parameters:
      conf - the topology configuration
      Returns:
      true if ZK is configured else false
    • handleUncaughtException

      public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker)
      Handles uncaught exceptions.
      Parameters:
      worker - true if this is for handling worker exceptions
    • handleUncaughtException

      public static void handleUncaughtException(Throwable t)
    • handleWorkerUncaughtException

      public static void handleWorkerUncaughtException(Throwable t)
    • thriftSerialize

      public static byte[] thriftSerialize(org.apache.storm.thrift.TBase t)
    • thriftDeserialize

      public static <T> T thriftDeserialize(Class<T> c, byte[] b)
    • thriftDeserialize

      public static <T> T thriftDeserialize(Class<T> c, byte[] b, int offset, int length)
    • sleepNoSimulation

      public static void sleepNoSimulation(long millis)
    • sleep

      public static void sleep(long millis)
    • makeUptimeComputer

      public static Utils.UptimeComputer makeUptimeComputer()
    • reverseMap

      public static <K, V> HashMap<V,List<K>> reverseMap(Map<K,V> map)
      "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}".

      Example usage in java: Map<Integer, String> tasks; Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);

      The order of he resulting list values depends on the ordering properties of the Map passed in. The caller is responsible for passing an ordered map if they expect the result to be consistently ordered as well.

      Parameters:
      map - to reverse
      Returns:
      a reversed map
    • reverseMap

      public static Map<Object,List<Object>> reverseMap(List<List<Object>> listSeq)
      "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)
      Parameters:
      listSeq - to reverse
      Returns:
      a reversed map
    • isOnWindows

      public static boolean isOnWindows()
    • checkFileExists

      public static boolean checkFileExists(String path)
    • forceDelete

      public static void forceDelete(String path) throws IOException
      Deletes a file or directory and its contents if it exists. Does not complain if the input is null or does not exist.
      Parameters:
      path - the path to the file or directory
      Throws:
      IOException
    • serialize

      public static byte[] serialize(Object obj)
    • deserialize

      public static <T> T deserialize(byte[] serialized, Class<T> clazz)
    • serializeToString

      public static String serializeToString(Object obj)
      Serialize an object using the configured serialization and then base64 encode it into a string.
      Parameters:
      obj - the object to encode
      Returns:
      a string with the encoded object in it.
    • deserializeFromString

      public static <T> T deserializeFromString(String str, Class<T> clazz)
      Deserialize an object stored in a string. The String is assumed to be a base64 encoded string containing the bytes to actually deserialize.
      Parameters:
      str - the encoded string.
      clazz - the thrift class we are expecting.
      Returns:
      the decoded object
    • toByteArray

      public static byte[] toByteArray(ByteBuffer buffer)
    • mkSuicideFn

      public static Runnable mkSuicideFn()
    • readAndLogStream

      public static void readAndLogStream(String prefix, InputStream in)
    • getComponentCommon

      public static ComponentCommon getComponentCommon(StormTopology topology, String id)
    • tuple

      public static List<Object> tuple(Object... values)
    • gzip

      public static byte[] gzip(byte[] data)
    • gunzip

      public static byte[] gunzip(byte[] data)
    • getRepeat

      public static List<String> getRepeat(List<String> list)
    • getGlobalStreamId

      public static GlobalStreamId getGlobalStreamId(String componentId, String streamId)
    • getSetComponentObject

      public static Object getSetComponentObject(ComponentObject obj)
    • toPositive

      public static int toPositive(int number)
      A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. When the input number is negative, the returned positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) which is not its absolutely value.
      Parameters:
      number - a given number
      Returns:
      a positive number.
    • processPid

      public static String processPid()
      Get process PID.
      Returns:
      the pid of this JVM, because Java doesn't provide a real way to do this.
    • fromCompressedJsonConf

      public static Map<String,Object> fromCompressedJsonConf(byte[] serialized)
    • redactValue

      public static Map<String,Object> redactValue(Map<String,Object> m, String key)
      Creates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'. (If the object is not a string to string will be called on it and replaced)
      Parameters:
      m - The map that a value will be redacted from
      key - The key pointing to the value to be redacted
      Returns:
      a new map with the value redacted. The original map will not be modified.
    • createDefaultUncaughtExceptionHandler

      public static Thread.UncaughtExceptionHandler createDefaultUncaughtExceptionHandler()
    • createWorkerUncaughtExceptionHandler

      public static Thread.UncaughtExceptionHandler createWorkerUncaughtExceptionHandler()
    • setupDefaultUncaughtExceptionHandler

      public static void setupDefaultUncaughtExceptionHandler()
    • setupWorkerUncaughtExceptionHandler

      public static void setupWorkerUncaughtExceptionHandler()
    • parseJvmHeapMemByChildOpts

      public static Double parseJvmHeapMemByChildOpts(List<String> options, Double defaultValue)
      parses the arguments to extract jvm heap memory size in MB.
      Returns:
      the value of the JVM heap memory setting (in MB) in a java command.
    • getClientBlobStore

      public static ClientBlobStore getClientBlobStore(Map<String,Object> conf)
    • isValidConf

      public static boolean isValidConf(Map<String,Object> topoConfIn)
    • getTopologyInfo

      public static TopologyInfo getTopologyInfo(String name, String asUser, Map<String,Object> topoConf)
    • getTopologyId

      public static String getTopologyId(String name, Nimbus.Iface client)
    • validateTopologyBlobStoreMap

      public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf) throws InvalidTopologyException, AuthorizationException
      Validate topology blobstore map.
      Parameters:
      topoConf - Topology configuration
      Throws:
      InvalidTopologyException
      AuthorizationException
    • validateTopologyBlobStoreMap

      public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf, NimbusBlobStore client) throws InvalidTopologyException, AuthorizationException
      Validate topology blobstore map.
      Parameters:
      topoConf - Topology configuration
      client - The NimbusBlobStore client. It must call prepare() before being used here.
      Throws:
      InvalidTopologyException
      AuthorizationException
    • validateTopologyBlobStoreMap

      public static void validateTopologyBlobStoreMap(Map<String,Object> topoConf, BlobStore blobStore) throws InvalidTopologyException, AuthorizationException
      Validate topology blobstore map.
      Throws:
      InvalidTopologyException
      AuthorizationException
    • threadDump

      public static String threadDump()
      Gets some information, including stack trace, for a running thread.
      Returns:
      A human-readable string of the dump.
    • checkDirExists

      public static boolean checkDirExists(String dir)
    • getConfiguredClass

      public static Object getConfiguredClass(Map<String,Object> conf, Object configKey)
      Return a new instance of a pluggable specified in the conf.
      Parameters:
      conf - The conf to read from.
      configKey - The key pointing to the pluggable class
      Returns:
      an instance of the class or null if it is not specified.
    • isZkAuthenticationConfiguredStormServer

      public static boolean isZkAuthenticationConfiguredStormServer(Map<String,Object> conf)
      Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.
      Parameters:
      conf - the storm configuration, not the topology configuration
      Returns:
      true if it is configured else false.
    • toCompressedJsonConf

      public static byte[] toCompressedJsonConf(Map<String,Object> topoConf)
    • nullToZero

      public static double nullToZero(Double v)
    • OR

      public static <V> V OR(V a, V b)
      a or b the first one that is not null.
      Parameters:
      a - something
      b - something else
      Returns:
      a or b the first one that is not null
    • integerDivided

      public static TreeMap<Integer,Integer> integerDivided(int sum, int numPieces)
    • partitionFixed

      public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll)
      Fills up chunks out of a collection (given a maximum amount of chunks).

      i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]] partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]] partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]

      Parameters:
      maxNumChunks - the maximum number of chunks to return
      coll - the collection to be chunked up
      Returns:
      a list of the chunks, which are themselves lists.
    • readYamlFile

      public static Object readYamlFile(String yamlFile)
    • getAvailablePort

      public static int getAvailablePort(int preferredPort)
      Gets an available port. Consider if it is possible to pass port 0 to the server instead of using this method, since there is no guarantee that the port returned by this method will remain free.
      Returns:
      The preferred port if available, or a random available port
    • getAvailablePort

      public static int getAvailablePort()
      Shortcut to calling getAvailablePort(int) with 0 as the preferred port.
      Returns:
      A random available port
    • findOne

      public static <T> T findOne(IPredicate<T> pred, Collection<T> coll)
      Find the first item of coll for which pred.test(...) returns true.
      Parameters:
      pred - The IPredicate to test for
      coll - The Collection of items to search through.
      Returns:
      The first matching value in coll, or null if nothing matches.
    • findOne

      public static <T, U> T findOne(IPredicate<T> pred, Map<U,T> map)
    • parseJson

      public static Map<String,Object> parseJson(String json)
    • memoizedLocalHostname

      public static String memoizedLocalHostname() throws UnknownHostException
      Throws:
      UnknownHostException
    • addVersions

      public static StormTopology addVersions(StormTopology topology)
      Add version information to the given topology.
      Parameters:
      topology - the topology being submitted (MIGHT BE MODIFIED)
      Returns:
      topology
    • getConfiguredClasspathVersions

      public static NavigableMap<SimpleVersion,List<String>> getConfiguredClasspathVersions(Map<String,Object> conf, List<String> currentClassPath)
      Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP
      Parameters:
      conf - what to read it out of
      currentClassPath - the current classpath for this version of storm (not included in the conf, but returned by this)
      Returns:
      the map
    • getAlternativeVersionsMap

      public static NavigableMap<String,IVersionInfo> getAlternativeVersionsMap(Map<String,Object> conf)
      Get a mapping of the configured supported versions of storm to their actual versions.
      Parameters:
      conf - what to read the configuration out of.
      Returns:
      the map.
    • getConfiguredWorkerMainVersions

      public static NavigableMap<SimpleVersion,String> getConfiguredWorkerMainVersions(Map<String,Object> conf)
      Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP
      Parameters:
      conf - what to read it out of
      Returns:
      the map
    • getConfiguredWorkerLogWriterVersions

      public static NavigableMap<SimpleVersion,String> getConfiguredWorkerLogWriterVersions(Map<String,Object> conf)
      Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP
      Parameters:
      conf - what to read it out of
      Returns:
      the map
    • getCompatibleVersion

      public static <T> T getCompatibleVersion(NavigableMap<SimpleVersion,T> versionedMap, SimpleVersion desiredVersion, String what, T defaultValue)
    • getConfigFromClasspath

      public static Map<String,Object> getConfigFromClasspath(List<String> cp, Map<String,Object> conf) throws IOException
      Throws:
      IOException
    • isLocalhostAddress

      public static boolean isLocalhostAddress(String address)
    • merge

      public static <K, V> Map<K,V> merge(Map<? extends K,? extends V> first, Map<? extends K,? extends V> other)
    • convertToArray

      public static <V> ArrayList<V> convertToArray(Map<Integer,V> srcMap, int start)
    • forceDeleteImpl

      protected void forceDeleteImpl(String path) throws IOException
      Throws:
      IOException
    • makeUptimeComputerImpl

      public Utils.UptimeComputer makeUptimeComputerImpl()
    • localHostnameImpl

      protected String localHostnameImpl() throws UnknownHostException
      Throws:
      UnknownHostException
    • hostnameImpl

      protected String hostnameImpl() throws UnknownHostException
      Throws:
      UnknownHostException
    • isValidKey

      public static boolean isValidKey(String key)
      Validates blob key.
      Parameters:
      key - Key for the blob.
    • validateTopologyName

      public static void validateTopologyName(String name) throws IllegalArgumentException
      Validates topology name.
      Parameters:
      name - the topology name
      Throws:
      IllegalArgumentException - if the topology name is not valid
    • findComponentCycles

      public static List<List<String>> findComponentCycles(StormTopology topology, String topoId)
      Find and return components cycles in the topology graph when starting from spout. Return a list of cycles. Each cycle may consist of one or more components. Components that cannot be reached from any of the spouts are ignored.
      Returns:
      a List of cycles. Each cycle has a list of component names.
    • validateCycleFree

      public static void validateCycleFree(StormTopology topology, String name) throws InvalidTopologyException
      Validate that the topology is cycle free. If not, then throw an InvalidTopologyException describing the cycle(s).
      Parameters:
      topology - StormTopology instance to examine.
      name - Name of the topology, used in exception error message.
      Throws:
      InvalidTopologyException - if there are cycles, with message describing the cycles encountered.