Package org.apache.storm.utils
Class ServerUtils
java.lang.Object
org.apache.storm.utils.ServerUtils
-
Field Summary
Modifier and TypeFieldDescriptionstatic final boolean
static final org.slf4j.Logger
static final int
static final int
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionstatic boolean
Find if all processes for the user on workId are dead.static boolean
canUserReadBlob
(ReadableBlobMeta meta, String user, Map<String, Object> conf) static String
containerFilePath
(String dir) static String
Returns the value of java.class.path System property.static int
execCommand
(String... command) static void
extractZipFile
(ZipFile zipFile, File toDir, String prefix) Extracts the given file to the given directory.static void
forceKillProcess
(String pid) static ClientBlobStore
static int
getComponentParallelism
(Map<String, Object> topoConf, Object component) getComponentParallelism
(Map<String, Object> topoConf, StormTopology topology) static long
getDiskUsage
(File dir) Takes an input dir or file and returns the disk usage on that local directory.static double
getEstimatedTotalHeapMemoryRequiredByTopo
(Map<String, Object> topoConf, StormTopology topology) static int
getEstimatedWorkerCountForRasTopo
(Map<String, Object> topoConf, StormTopology topology) static String
getFileOwner
(String path) static long
Get system free memory in megabytes.static BlobStore
getNimbusBlobStore
(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector) static BlobStore
getNimbusBlobStore
(Map<String, Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector) static int
getPathOwnerUid
(String fpath) Get the userId of the onwer of the path by running "ls -dn path" command.static URL
Returns the current thread classloader.static int
Get the userId for a user name.static <T> List<T>
interleaveAll
(List<List<T>> nodeList) static boolean
isAbsolutePath
(String path) static boolean
isAnyPosixProcessPidDirAlive
(Collection<Long> pids, String user) Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user.static boolean
isAnyPosixProcessPidDirAlive
(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser.static boolean
isAnyProcessAlive
(Collection<Long> pids, int uid) Are any of the processes alive and running for the specified userId.static boolean
isAnyProcessAlive
(Collection<Long> pids, String user) Are any of the processes alive and running for the specified user.static boolean
isProcessAlive
(long pid, String user) Is a process alive and running?.static boolean
Check if the scheduler is resource aware or not.static void
static long
nimbusVersionOfBlob
(String key, ClientBlobStore cb) static Subject
principalNameToSubject
(String name) static String
scriptFilePath
(String dir) static void
sendSignalToProcess
(long lpid, int signum) static ServerUtils
Provide an instance of this class for delegates to use.static String
Returns the combined string, escaped for posix shell.static void
Unpack matching files from a jar.static void
static void
Given a Tar File as input it will untar the file in a the untar directory passed as the second parameterstatic void
Given a File input it will unzip the file in a the unzip directory passed as the second parameter.static void
validateTopologyAckerBundleResource
(Map<String, Object> topoConf, StormTopology topology, String topoName) RAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker.static void
validateTopologyWorkerMaxHeapSizeConfigs
(Map<String, Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) static String
Writes a posix shell script file to be executed in its own process.static String
Writes a posix shell script file to be executed in its own process.static boolean
zipDoesContainDir
(String zipfile, String target) Determines if a zip archive contains a particular directory.static long
zipFileSize
(File myFile) Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.
-
Field Details
-
LOG
public static final org.slf4j.Logger LOG -
IS_ON_WINDOWS
public static final boolean IS_ON_WINDOWS -
SIGKILL
public static final int SIGKILL- See Also:
-
SIGTERM
public static final int SIGTERM- See Also:
-
-
Constructor Details
-
ServerUtils
public ServerUtils()
-
-
Method Details
-
setInstance
Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.- Parameters:
u
- a ServerUtils instance- Returns:
- the previously set instance
-
interleaveAll
-
getNimbusBlobStore
public static BlobStore getNimbusBlobStore(Map<String, Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector) -
getNimbusBlobStore
public static BlobStore getNimbusBlobStore(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector) -
isAbsolutePath
-
shellCmd
Returns the combined string, escaped for posix shell.- Parameters:
command
- the list of strings to be combined- Returns:
- the resulting command string
-
getDiskUsage
Takes an input dir or file and returns the disk usage on that local directory. Very basic implementation.- Parameters:
dir
- The input dir to get the disk space of this local dir- Returns:
- The total disk space of the input local directory
-
getClientBlobStoreForSupervisor
-
currentClasspath
Returns the value of java.class.path System property. Kept separate for testing.- Returns:
- the classpath
-
getResourceFromClassloader
Returns the current thread classloader. -
zipDoesContainDir
Determines if a zip archive contains a particular directory.- Parameters:
zipfile
- path to the zipped filetarget
- directory being looked for in the zip.- Returns:
- boolean whether or not the directory exists in the zip.
- Throws:
IOException
-
getFileOwner
- Throws:
IOException
-
containerFilePath
-
scriptFilePath
-
writeScript
public static String writeScript(String dir, List<String> command, Map<String, String> environment) throws IOExceptionWrites a posix shell script file to be executed in its own process.- Parameters:
dir
- the directory under which the script is to be writtencommand
- the command the script is to executeenvironment
- optional environment variables to set before running the script's command. May be null.- Returns:
- the path to the script that has been written
- Throws:
IOException
-
writeScript
public static String writeScript(String dir, List<String> command, Map<String, String> environment, String umask) throws IOExceptionWrites a posix shell script file to be executed in its own process.- Parameters:
dir
- the directory under which the script is to be writtencommand
- the command the script is to executeenvironment
- optional environment variables to set before running the script's command. May be null.umask
- umask to be set. It can be null.- Returns:
- the path to the script that has been written
- Throws:
IOException
-
execCommand
public static int execCommand(String... command) throws org.apache.commons.exec.ExecuteException, IOException - Throws:
org.apache.commons.exec.ExecuteException
IOException
-
sendSignalToProcess
- Throws:
IOException
-
killProcessWithSigTerm
- Throws:
IOException
-
forceKillProcess
- Throws:
IOException
-
nimbusVersionOfBlob
public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException -
canUserReadBlob
-
unJar
Unpack matching files from a jar. Entries inside the jar that do not match the given pattern will be skipped.- Parameters:
jarFile
- the .jar file to unpacktoDir
- the destination directory into which to unpack the jar- Throws:
IOException
-
unTar
Given a Tar File as input it will untar the file in a the untar directory passed as the second parameter This utility will untar ".tar" files and ".tar.gz","tgz" files.- Parameters:
inFile
- The tar file as inputuntarDir
- The untar directory where to untar the tar filesymlinksDisabled
- true if symlinks should be disabled, else false- Throws:
IOException
-
unpack
- Throws:
IOException
-
extractZipFile
Extracts the given file to the given directory. Only zip entries starting with the given prefix are extracted. The prefix is stripped off entry names before extraction.- Parameters:
zipFile
- The zip file to extracttoDir
- The directory to extract toprefix
- The prefix to look for in the zip file. If not null only paths starting with the prefix will be extracted- Throws:
IOException
-
unZip
Given a File input it will unzip the file in a the unzip directory passed as the second parameter.- Parameters:
inFile
- The zip file as inputtoDir
- The unzip directory where to unzip the zip file- Throws:
IOException
-
zipFileSize
Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.- Parameters:
myFile
- The zip file as input- Returns:
- zip file size as a long
- Throws:
IOException
-
isRas
Check if the scheduler is resource aware or not.- Parameters:
conf
- The configuration- Returns:
- True if it's resource aware; false otherwise
-
getEstimatedWorkerCountForRasTopo
public static int getEstimatedWorkerCountForRasTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
getEstimatedTotalHeapMemoryRequiredByTopo
public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
getComponentParallelism
public static Map<String,Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
getComponentParallelism
public static int getComponentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
principalNameToSubject
-
currentClasspathImpl
-
getResourceFromClassloaderImpl
-
getMemInfoFreeMb
Get system free memory in megabytes.- Returns:
- system free memory in megabytes
- Throws:
IOException
- on I/O exception
-
isProcessAlive
Is a process alive and running?.- Parameters:
pid
- the PID of the running processuser
- the user that is expected to own that process- Returns:
- true if it is, else false
- Throws:
IOException
- on any error
-
isAnyProcessAlive
Are any of the processes alive and running for the specified user. If collection is empty or null then the return value is trivially false.- Parameters:
pids
- the PIDs of the running processesuser
- the user that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException
- on I/O exception
-
isAnyProcessAlive
Are any of the processes alive and running for the specified userId. If collection is empty or null then the return value is trivially false.- Parameters:
pids
- the PIDs of the running processesuid
- the user that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException
- on I/O exception
-
getUserId
Get the userId for a user name. This works on Posix systems by using "id -u" command. Throw IllegalArgumentException on Windows.- Parameters:
user
- username to be converted to UID. This is optional, in which case current user is returned.- Returns:
- UID for the specified user (if supplied), else UID of current user, -1 upon Exception.
-
getPathOwnerUid
Get the userId of the onwer of the path by running "ls -dn path" command. This command works on Posix systems only.- Parameters:
fpath
- full path to the file or directory.- Returns:
- UID for the specified if successful, -1 upon failure.
-
areAllProcessesDead
public static boolean areAllProcessesDead(Map<String, Object> conf, String user, String workerId, Set<Long> pids) throws IOExceptionFind if all processes for the user on workId are dead. This method attempts to optimize the calls by:- checking a collection of ProcessIds at once
- using userId one Posix systems instead of user
- Returns:
- true if all processes for the user are dead on the worker
- Throws:
IOException
- if external commands have exception.
-
isAnyPosixProcessPidDirAlive
public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String user) throws IOException Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)
Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.
- Parameters:
pids
- Process IDs that need to be monitored for livenessuser
- the userId that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException
- on I/O exception
-
isAnyPosixProcessPidDirAlive
public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) throws IOException Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)
Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.
- Parameters:
pids
- Process IDs that need to be monitored for livenessexpectedUser
- the userId that is expected to own that processmockFileOwnerToUid
- if true (used for testing), then convert File.owner to UID- Returns:
- true if any one of the processes is owned by expectedUser and alive, else false
- Throws:
IOException
- on I/O exception
-
validateTopologyWorkerMaxHeapSizeConfigs
public static void validateTopologyWorkerMaxHeapSizeConfigs(Map<String, Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
validateTopologyAckerBundleResource
public static void validateTopologyAckerBundleResource(Map<String, Object> topoConf, StormTopology topology, String topoName) throws InvalidTopologyExceptionRAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker. Validations are performed here: (Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER
* memory for an acker + memory for the biggest topo executor) < max worker heap memory. When RAS tries to schedule an executor to a new worker, it will putConfig.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER
ackers into the worker first. SoConfig.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB
need to be able to accommodate this.- Parameters:
topoConf
- Topology conftopology
- Topology (not system topology)topoName
- The name of the topology- Throws:
InvalidTopologyException
-