(NOTE: this page is based on the 0.7.1 code; many things have changed since then, including a split between tasks and executors, and a reorganization of the code under storm-client/src rather than src/.)
This page explains in detail the lifecycle of a topology from running the "storm jar" command to uploading the topology to Nimbus to the supervisors starting/stopping workers to workers and tasks setting themselves up. It also explains how Nimbus monitors topologies and how topologies are shutdown when they are killed.
First a couple of important notes about topologies:
system-topology! is used in two places:
StormSubmitter later. codeWhen your code uses StormSubmitter.submitTopology, StormSubmitter takes the following actions:
StormSubmitter uploads the jar if it hasn't been uploaded before. codebeginFileUpload returns a path in Nimbus's inboxuploadChunkfinishFileUpload is called when it's finished uploadingStormSubmitter calls submitTopology on the Nimbus thrift interface codesubmitTopology call takes in the Nimbus inbox path where the jar was uploadedNimbus receives the topology submission. code
Nimbus normalizes the topology configuration. The main purpose of normalization is to ensure that every single task will have the same serialization registrations, which is critical for getting serialization working correctly. code
Nimbus sets up the static state for the topology code
setup-storm-static writes task -> component mapping into ZKsetup-heartbeats creates a ZK "directory" in which tasks can heartbeatNimbus calls mk-assignment to assign tasks to machines code
master-code-dir: used by supervisors to download the correct jars/configs for the topology from Nimbustask->node+port: Map from a task id to the worker that task should be running on. (A worker is identified by a node/port pair)node->host: A map from node id to hostname. This is used so workers know which machines to connect to to communicate with other workers. Node ids are used to identify supervisors so that multiple supervisors can be run on one machine. One place this is done is with Mesos integration.task->start-time-secs: Contains a map from task id to the timestamp at which Nimbus launched that task. This is used by Nimbus when monitoring topologies, as tasks are given a longer timeout to heartbeat when they're first launched (the launch timeout is configured by "nimbus.task.launch.secs" config)Once topologies are assigned, they're initially in a deactivated mode. start-storm writes data into Zookeeper so that the cluster knows the topology is active and can start emitting tuples from spouts. code
TODO cluster state diagram (show all nodes and what's kept everywhere)
Supervisor runs two functions in the background:
synchronize-supervisor: This is called whenever assignments in Zookeeper change and also every 10 seconds. code
sync-processes: Reads from the LFS what synchronize-supervisor wrote and compares that to what's actually running on the machine. It then starts/stops worker processes as necessary to synchronize. codeWorker processes start up through the mk-worker function code
storm-active-atom variable. This variable is used by tasks to determine whether or not to call nextTuple on the spouts. codeTasks are set up through the mk-task function code
reassign-topology through reassign-transition codereassign-topology calls mk-assignments, the same function used to assign the topology the first time. mk-assignments is also capable of incrementally updating a topology
mk-assignments checks heartbeats and reassigns workers as necessarydo-cleanup function which will clean up the heartbeat dir and the jars/configs stored locally. code