聊聊storm supervisor的启动

本文主要研究一下storm supervisor的启动

Supervisor.launch

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Supervisor.java

/**
 * Launch the supervisor
 */
 public void launch() throws Exception {
 LOG.info("Starting Supervisor with conf {}", conf);
 String path = ConfigUtils.supervisorTmpDir(conf);
 FileUtils.cleanDirectory(new File(path));
 Localizer localizer = getLocalizer();
 SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
 hb.run();
 // should synchronize supervisor so it doesn't launch anything after being down (optimization)
 Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
 heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);
 this.eventManager = new EventManagerImp(false);
 this.readState = new ReadClusterState(this);
 
 Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
 Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
 if (portToAssignments != null) {
 Map<String, LocalAssignment> assignments = new HashMap<>();
 for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
 assignments.put(la.get_topology_id(), la);
 }
 for (String topoId : downloadedTopoIds) {
 LocalAssignment la = assignments.get(topoId);
 if (la != null) {
 SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
 } else {
 LOG.warn("Could not find an owner for topo {}", topoId);
 }
 }
 }
 // do this after adding the references so we don't try to clean things being used
 localizer.startCleaner();
 UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
 if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
 // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
 // to date even if callbacks don't all work exactly right
 eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
 // Blob update thread. Starts with 30 seconds delay, every 30 seconds
 blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
 // supervisor health check
 eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
 }
 LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
 }
  • supervisor launch的时候new了一个ReadClusterState

ReadClusterState

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/ReadClusterState.java

public ReadClusterState(Supervisor supervisor) throws Exception {
 this.superConf = supervisor.getConf();
 this.stormClusterState = supervisor.getStormClusterState();
 this.syncSupEventManager = supervisor.getEventManger();
 this.assignmentVersions = new AtomicReference<Map<String, VersionedData<Assignment>>>(new HashMap<String, VersionedData<Assignment>>());
 this.assignmentId = supervisor.getAssignmentId();
 this.iSuper = supervisor.getiSupervisor();
 this.localizer = supervisor.getAsyncLocalizer();
 this.host = supervisor.getHostName();
 this.localState = supervisor.getLocalState();
 this.clusterState = supervisor.getStormClusterState();
 this.cachedAssignments = supervisor.getCurrAssignment();
 
 this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext());
 
 @SuppressWarnings("unchecked")
 List<Number> ports = (List<Number>)superConf.get(Config.SUPERVISOR_SLOTS_PORTS);
 for (Number port: ports) {
 slots.put(port.intValue(), mkSlot(port.intValue()));
 }
 
 try {
 Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf);
 for (Slot slot: slots.values()) {
 String workerId = slot.getWorkerId();
 if (workerId != null) {
 workers.remove(workerId);
 }
 }
 if (!workers.isEmpty()) {
 supervisor.killWorkers(workers, launcher);
 }
 } catch (Exception e) {
 LOG.warn("Error trying to clean up old workers", e);
 }
 //All the slots/assignments should be recovered now, so we can clean up anything that we don't expect to be here
 try {
 localizer.cleanupUnusedTopologies();
 } catch (Exception e) {
 LOG.warn("Error trying to clean up old topologies", e);
 }
 
 for (Slot slot: slots.values()) {
 slot.start();
 }
 }
 private Slot mkSlot(int port) throws Exception {
 return new Slot(localizer, superConf, launcher, host, port,
 localState, clusterState, iSuper, cachedAssignments);
 }
  • 这里读取SUPERVISOR_SLOTS_PORTS(supervisor.slots.ports),默认是[6700,6701,6702,6703]
  • 通过ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext())创建ContainerLauncher
  • 根据slots的port配置调用mkSlot创建slot,最后挨个调用slot的start,启动slot线程

ContainerLauncher.make

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/ContainerLauncher.java

/**
 * Factory to create the right container launcher 
 * for the config and the environment.
 * @param conf the config
 * @param supervisorId the ID of the supervisor
 * @param sharedContext Used in local mode to let workers talk together without netty
 * @return the proper container launcher
 * @throws IOException on any error
 */
 public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, IContext sharedContext) throws IOException {
 if (ConfigUtils.isLocalMode(conf)) {
 return new LocalContainerLauncher(conf, supervisorId, sharedContext);
 }
 
 if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 return new RunAsUserContainerLauncher(conf, supervisorId);
 }
 return new BasicContainerLauncher(conf, supervisorId);
 }
  • 这里根据配置来创建ContainerLauncher的不同子类,local模式的创建的是LocalContainerLauncher;要求runAsUser的创建的是RunAsUserContainerLauncher;其他的创建的是BasicContainerLauncher

Slot

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Slot.java

public void run() {
 try {
 while(!done) {
 Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get());
 Set<TopoProfileAction> removed = new HashSet<>(origProfileActions);
 
 DynamicState nextState = 
 stateMachineStep(dynamicState.withNewAssignment(newAssignment.get())
 .withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions), staticState);
 if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) {
 LOG.info("STATE {} -> {}", dynamicState, nextState);
 }
 //Save the current state for recovery
 if (!equivalent(nextState.currentAssignment, dynamicState.currentAssignment)) {
 LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment, nextState.currentAssignment);
 saveNewAssignment(nextState.currentAssignment);
 }
 if (equivalent(nextState.newAssignment, nextState.currentAssignment) &&
 nextState.currentAssignment != null && nextState.currentAssignment.get_owner() == null &&
 nextState.newAssignment != null && nextState.newAssignment.get_owner() != null) {
 //This is an odd case for a rolling upgrade where the user on the old assignment may be null,
 // but not on the new one. Although in all other ways they are the same.
 // If this happens we want to use the assignment with the owner.
 LOG.info("Updating assignment to save owner {}", nextState.newAssignment.get_owner());
 saveNewAssignment(nextState.newAssignment);
 nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment);
 }
 
 // clean up the profiler actions that are not being processed
 removed.removeAll(dynamicState.profileActions);
 removed.removeAll(dynamicState.pendingStopProfileActions);
 for (TopoProfileAction action: removed) {
 try {
 clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
 } catch (Exception e) {
 LOG.error("Error trying to remove profiling request, it will be retried", e);
 }
 }
 Set<TopoProfileAction> orig, copy;
 do {
 orig = profiling.get();
 copy = new HashSet<>(orig);
 copy.removeAll(removed);
 } while (!profiling.compareAndSet(orig, copy));
 dynamicState = nextState;
 }
 } catch (Throwable e) {
 if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
 LOG.error("Error when processing event", e);
 Utils.exitProcess(20, "Error when processing an event");
 }
 }
 }
 private void saveNewAssignment(LocalAssignment assignment) {
 synchronized(staticState.localState) {
 Map<Integer, LocalAssignment> assignments = staticState.localState.getLocalAssignmentsMap();
 if (assignments == null) {
 assignments = new HashMap<>();
 }
 if (assignment == null) {
 assignments.remove(staticState.port);
 } else {
 assignments.put(staticState.port, assignment);
 }
 staticState.localState.setLocalAssignmentsMap(assignments);
 }
 Map<Long, LocalAssignment> update = null;
 Map<Long, LocalAssignment> orig = null;
 do {
 Long lport = new Long(staticState.port);
 orig = cachedCurrentAssignments.get();
 update = new HashMap<>(orig);
 if (assignment == null) {
 update.remove(lport);
 } else {
 update.put(lport, assignment);
 }
 } while (!cachedCurrentAssignments.compareAndSet(orig, update));
 }
 static DynamicState stateMachineStep(DynamicState dynamicState, StaticState staticState) throws Exception {
 LOG.debug("STATE {}", dynamicState.state);
 switch (dynamicState.state) {
 case EMPTY:
 return handleEmpty(dynamicState, staticState);
 case RUNNING:
 return handleRunning(dynamicState, staticState);
 case WAITING_FOR_WORKER_START:
 return handleWaitingForWorkerStart(dynamicState, staticState);
 case KILL_AND_RELAUNCH:
 return handleKillAndRelaunch(dynamicState, staticState);
 case KILL:
 return handleKill(dynamicState, staticState);
 case WAITING_FOR_BASIC_LOCALIZATION:
 return handleWaitingForBasicLocalization(dynamicState, staticState);
 case WAITING_FOR_BLOB_LOCALIZATION:
 return handleWaitingForBlobLocalization(dynamicState, staticState);
 default:
 throw new IllegalStateException("Code not ready to handle a state of "+dynamicState.state);
 }
 }
  • 不断循环stateMachineStep方法切换state
  • 当state是WAITING_FOR_BLOB_LOCALIZATION时,会触发handleWaitingForBlobLocalization

handleWaitingForBlobLocalization

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Slot.java

/**
 * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state.
 * PRECONDITION: neither pendingLocalization nor pendingDownload is null.
 * PRECONDITION: The slot should be empty
 * @param dynamicState current state
 * @param staticState static data
 * @return the next state
 * @throws Exception on any error
 */
 static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
 assert(dynamicState.pendingLocalization != null);
 assert(dynamicState.pendingDownload != null);
 assert(dynamicState.container == null);
 
 //Ignore changes to scheduling while downloading the topology blobs
 // We don't support canceling the download through the future yet,
 // so to keep everything in sync, just wait
 try {
 dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
 //Downloading of all blobs finished.
 if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) {
 //Scheduling changed
 staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
 return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);
 }
 Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState);
 return dynamicState.withCurrentAssignment(c, dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START).withPendingLocalization(null, null);
 } catch (TimeoutException e) {
 //We waited for 1 second loop around and try again....
 return dynamicState;
 }
 }
  • 这里通过staticState.containerLauncher.launchContainer去启动container

BasicContainerLauncher.launchContainer

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java

@Override
 public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
 LocalContainer ret = new LocalContainer(_conf, _supervisorId, port, assignment, _sharedContext);
 ret.setup();
 ret.launch();
 return ret;
 }
  • launchContainer的时候,先调用setup,再调用launch方法

Container.setup

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Container.java

/**
 * Setup the container to run. By default this creates the needed directories/links in the
 * local file system
 * PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and
 * placed in the appropriate locations
 * @throws IOException on any error
 */
 protected void setup() throws IOException {
 _type.assertFull();
 if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
 LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
 _supervisorId, _port, _workerId);
 throw new IllegalStateException("Not all needed files are here!!!!");
 } 
 LOG.info("Setting up {}:{}", _supervisorId, _workerId);
 _ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)));
 _ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)));
 _ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)));
 
 File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
 if (!_ops.fileExists(workerArtifacts)) {
 _ops.forceMkdir(workerArtifacts);
 _ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts);
 }
 
 String user = getWorkerUser();
 writeLogMetadata(user);
 saveWorkerUser(user);
 createArtifactsLink();
 createBlobstoreLinks();
 }
  • setup主要做一些创建目录或链接的准备工作

BasicContainer.launch

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainer.java

public void launch() throws IOException {
 _type.assertFull();
 LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
 _supervisorId, _port, _workerId);
 String logPrefix = "Worker Process " + _workerId;
 ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);
 _exitedEarly = false;
 
 final WorkerResources resources = _assignment.get_resources();
 final int memOnheap = getMemOnHeap(resources);
 final String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
 final String jlp = javaLibraryPath(stormRoot, _conf);
 
 List<String> commandList = mkLaunchCommand(memOnheap, stormRoot, jlp);
 Map<String, String> topEnvironment = new HashMap<String, String>();
 @SuppressWarnings("unchecked")
 Map<String, String> environment = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
 if (environment != null) {
 topEnvironment.putAll(environment);
 }
 topEnvironment.put("LD_LIBRARY_PATH", jlp);
 LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));
 String workerDir = ConfigUtils.workerRoot(_conf, _workerId);
 launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
 }
 /**
 * Launch the worker process (non-blocking)
 * 
 * @param command
 * the command to run
 * @param env
 * the environment to run the command
 * @param processExitcallback
 * a callback for when the process exits
 * @param logPrefix
 * the prefix to include in the logs
 * @param targetDir
 * the working directory to run the command in
 * @return true if it ran successfully, else false
 * @throws IOException
 * on any error
 */
 protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
 ExitCodeCallback processExitCallback, File targetDir) throws IOException {
 SupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
 }
  • 这里通过mkLaunchCommand来准备创建命令
  • 然后通过SupervisorUtils.launchProcess启动worker进程

mkLaunchCommand

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java

/**
 * Create the command to launch the worker process
 * @param memOnheap the on heap memory for the worker
 * @param stormRoot the root dist dir for the topology
 * @param jlp java library path for the topology
 * @return the command to run
 * @throws IOException on any error.
 */
 private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
 final String jlp) throws IOException {
 final String javaCmd = javaCmd("java");
 final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
 final String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
 final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);
 
 List<String> classPathParams = getClassPathParams(stormRoot);
 List<String> commonParams = getCommonParams();
 
 List<String> commandList = new ArrayList<>();
 //Log Writer Command...
 commandList.add(javaCmd);
 commandList.addAll(classPathParams);
 commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
 commandList.addAll(commonParams);
 commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.
 //Worker Command...
 commandList.add(javaCmd);
 commandList.add("-server");
 commandList.addAll(commonParams);
 commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap));
 commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
 commandList.addAll(substituteChildopts(OR(
 _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
 _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
 commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
 commandList.add("-Djava.library.path=" + jlp);
 commandList.add("-Dstorm.conf.file=" + stormConfFile);
 commandList.add("-Dstorm.options=" + stormOptions);
 commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
 commandList.addAll(classPathParams);
 commandList.add("org.apache.storm.daemon.worker");
 commandList.add(_topologyId);
 commandList.add(_supervisorId);
 commandList.add(String.valueOf(_port));
 commandList.add(_workerId);
 
 return commandList;
 }
  • 启动参数实例
/usr/lib/jvm/java-1.8-openjdk/jre/bin/java -server -Dlogging.sensitivity=S3 -Dlogfile.name=worker.log -Dstorm.home=/apache-storm-1.2.2 -Dworkers.artifacts=/logs/workers-artifacts -Dstorm.id=DemoTopology-1-1539163962 -Dworker.id=f0f30bc3-11af-4f4f-b2dd-8cc92d8791bf -Dworker.port=6700 -Dstorm.log.dir=/logs -Dlog4j.configurationFile=/apache-storm-1.2.2/log4j2/worker.xml -DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector -Dstorm.local.dir=/data -Xmx768m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -Djava.library.path=/data/supervisor/stormdist/DemoTopology-1-1539163962/resources/Linux-amd64:/data/supervisor/stormdist/DemoTopology-1-1539163962/resources:/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -Dstorm.options=storm.local.hostname%3D192.168.99.100 -Djava.io.tmpdir=/data/workers/f0f30bc3-11af-4f4f-b2dd-8cc92d8791bf/tmp -cp /apache-storm-1.2.2/lib/*:/apache-storm-1.2.2/extlib/*:/conf:/data/supervisor/stormdist/DemoTopology-1-1539163962/stormjar.jar org.apache.storm.daemon.worker DemoTopology-1-1539163962 8dd6dc7f-95cb-49f9-9bd1-f0d638fe6fc6 6700 f0f30bc3-11af-4f4f-b2dd-8cc92d8791bf
  • org.apache.storm.daemon.worker"的路径为storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/worker.clj

SupervisorUtils.launchProcess

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/SupervisorUtils.java

/**
 * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
 * callback.
 * @param command the command to be executed in the new process
 * @param environment the environment to be applied to the process. Can be
 * null.
 * @param logPrefix a prefix for log entries from the output of the process.
 * Can be null.
 * @param exitCodeCallback code to be called passing the exit code value
 * when the process completes
 * @param dir the working directory of the new process
 * @return the new process
 * @throws IOException
 * @see java.lang.ProcessBuilder
 */
 public static Process launchProcess(List<String> command,
 Map<String,String> environment,
 final String logPrefix,
 final ExitCodeCallback exitCodeCallback,
 File dir)
 throws IOException {
 ProcessBuilder builder = new ProcessBuilder(command);
 Map<String,String> procEnv = builder.environment();
 if (dir != null) {
 builder.directory(dir);
 }
 builder.redirectErrorStream(true);
 if (environment != null) {
 procEnv.putAll(environment);
 }
 final Process process = builder.start();
 if (logPrefix != null || exitCodeCallback != null) {
 Utils.asyncLoop(new Callable<Object>() {
 public Object call() {
 if (logPrefix != null ) {
 Utils.readAndLogStream(logPrefix,
 process.getInputStream());
 }
 if (exitCodeCallback != null) {
 try {
 process.waitFor();
 exitCodeCallback.call(process.exitValue());
 } catch (InterruptedException ie) {
 LOG.info("{} interrupted", logPrefix);
 exitCodeCallback.call(-1);
 }
 }
 return null; // Run only once.
 }
 });
 }
 return process;
 }
  • 这里通过ProcessBuilder来启动进程

小结

  • storm的supervisor启动的时候,会创建ContainerLauncher以及根据SUPERVISOR_SLOTS_PORTS(supervisor.slots.ports)创建slots
  • slot线程会不断循环state,在WAITING_FOR_BLOB_LOCALIZATION的时候使用ContainerLauncher的launchContainer创建Container并launch
  • container launch的时候通过SupervisorUtils.launchProcess(使用ProcessBuilder)启动worker进程

聊聊storm supervisor的启动

相关推荐