(NOTE: this page is based on the 0.7.1 code; many things have changed since then, including a split between tasks and executors, and a reorganization of the code under storm-core/src
rather than src/
.)
This page explains in detail the lifecycle of a topology from running the "storm jar" command to uploading the topology to Nimbus to the supervisors starting/stopping workers to workers and tasks setting themselves up. It also explains how Nimbus monitors topologies and how topologies are shutdown when they are killed.
First a couple of important notes about topologies:
system-topology!
is used in two places:
StormSubmitter
later. codeWhen your code uses StormSubmitter.submitTopology
, StormSubmitter
takes the following actions:
StormSubmitter
uploads the jar if it hasn't been uploaded before. codebeginFileUpload
returns a path in Nimbus's inboxuploadChunk
finishFileUpload
is called when it's finished uploadingStormSubmitter
calls submitTopology
on the Nimbus thrift interface codesubmitTopology
call takes in the Nimbus inbox path where the jar was uploadedNimbus receives the topology submission. code
Nimbus normalizes the topology configuration. The main purpose of normalization is to ensure that every single task will have the same serialization registrations, which is critical for getting serialization working correctly. code
Nimbus sets up the static state for the topology code
setup-storm-static
writes task -> component mapping into ZKsetup-heartbeats
creates a ZK "directory" in which tasks can heartbeatNimbus calls mk-assignment
to assign tasks to machines code
master-code-dir
: used by supervisors to download the correct jars/configs for the topology from Nimbustask->node+port
: Map from a task id to the worker that task should be running on. (A worker is identified by a node/port pair)node->host
: A map from node id to hostname. This is used so workers know which machines to connect to to communicate with other workers. Node ids are used to identify supervisors so that multiple supervisors can be run on one machine. One place this is done is with Mesos integration.task->start-time-secs
: Contains a map from task id to the timestamp at which Nimbus launched that task. This is used by Nimbus when monitoring topologies, as tasks are given a longer timeout to heartbeat when they're first launched (the launch timeout is configured by "nimbus.task.launch.secs" config)Once topologies are assigned, they're initially in a deactivated mode. start-storm
writes data into Zookeeper so that the cluster knows the topology is active and can start emitting tuples from spouts. code
TODO cluster state diagram (show all nodes and what's kept everywhere)
Supervisor runs two functions in the background:
synchronize-supervisor
: This is called whenever assignments in Zookeeper change and also every 10 seconds. code
sync-processes
: Reads from the LFS what synchronize-supervisor
wrote and compares that to what's actually running on the machine. It then starts/stops worker processes as necessary to synchronize. codeWorker processes start up through the mk-worker
function code
storm-active-atom
variable. This variable is used by tasks to determine whether or not to call nextTuple
on the spouts. codeTasks are set up through the mk-task
function code
reassign-topology
through reassign-transition
codereassign-topology
calls mk-assignments
, the same function used to assign the topology the first time. mk-assignments
is also capable of incrementally updating a topology
mk-assignments
checks heartbeats and reassigns workers as necessarydo-cleanup
function which will clean up the heartbeat dir and the jars/configs stored locally. code