流程图

image.png

image.png

启动流程分析

——————程序员client—————————

  1. 客户端运行storm nimbus时,会调用storm的python脚本,该脚本中为每个命令编写一个方法,每个方法都可以生成一条相应的java命令。
    命令格式如下:java -server xxxx.ClassName -args
  1. nimbus---> Running: /export/servers/jdk/bin/java -server backtype.storm.daemon.nimbus
  2. supervisor---> Running: /export/servers/jdk/bin/java -server backtype.storm.daemon.supervisor

———————nimbus——————————-

  1. nibums启动之后,接受客户端提交任务
    命令格式:storm jar xxx.jar xxx驱动类 参数
  1. Running: /export/servers/jdk/bin/java -client -Dstorm.jar=/export/servers/storm/examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount-28
  1. 该命令会执行 storm-starter-topologies-0.9.6.jar 中的storm-starter-topologies-0.9.6.jarmain方法,main方法中会执行以下代码
  1. StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());
  1. topologyBuilder.createTopology(),会将编写的spout对象和bolt对象进行序列化。
  2. 会将用户的jar上传到 nimbus物理节点的 /export/data/storm/workdir/nimbus/inbox目录下。并且改名,改名的规则是添加了一个UUID字符串。
  3. nimbus物理节点的 /export/data/storm/workdir/nimbus/stormdist目录下。有当前正在运行的topologyjar包和配置文件,序列化对象文件。
  1. nimbus接受到任务之后,会将任务进行分配,分配会产生一个assignment对象,该对象会保存到zk中,目录是/storm/assignments ,该目录只保存正在运行的topology任务。

————supervisor—————————

  1. supervisor通过watch机制,感知到nimbus在zk上的任务分配信息,从zk上拉取任务信息,分辨出属于自己任务。
  1. ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=<null>,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900]
  1. supervisor 根据自己的任务信息,启动自己的worker,并分配一个端口
  1. '/export/servers/jdk/bin/java' '-server' '-Xmx768m' export/data/storm/workdir/supervisor/stormdist/wordcount1-3-1461683066/stormjar.jar' 'backtype.storm.daemon.worker' 'wordcount1-3-1461683066' 'a69bb8fc-e08e-4d55-b51f-e539b066f90b' '6701' '9fac2805-7d2b-4e40-aabc-1c85c9856d64'

————-worker———————————

  1. worker启动之后,连接zk,拉取任务
  1. ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=<null>,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900]
  1. 假设任务信息:
  2. 1--->spout---type:spout
  3. 2--->bolt ---type:bolt
  4. 3--->acker---type:bolt
  5. 得到对象有几种方式?
  6. new ClassName 创建对象、
  7. class.forName 反射对象、
  8. clone 克隆对象、
  9. 序列化反序列化
  10. worker通过反序列化,得到程序员自己定义的spoutbolt对象。
  1. worker根据任务类型,分别执行spout任务或者bolt任务。
    spout的生命周期是:open、nextTuple、outPutFiled
    bolt的生命周期是:prepare、execute(tuple)、outPutFiled

代码流程

jstorm supervisor如何启动worker,worker如何启动task

  1. 下载Jstorm源码,在源码包下找到 daemon包,在这个包下有三个子包,分别是nimbus,supervisor,worker。
  2. 通过架构图,我们已知nimbus分配任务,并将任务信息写入到zk上,supervisor读取zk上的任务后启动自己的worker。所以我们分析supervisor如何启动worker,worker如何启动task。
  3. supervisor如何启动worker。打开 com.alibaba.jstorm.daemon.supervisor.Supervisor 发现supervisor有几个方法,方法中有个mkSupervisor方法。
  4. 进去Supervisor中的mkSupervisor方法,在第144行有以下的代码,该代码创建了SyncSupervisorEvent 对象。
  1. SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorId, conf, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb);
  1. SyncSupervisorEvent对象实现了RunnableCallback接口,该接口有个run方法会被定时执行。在run方法的191行,有代码如下,主要是要supervisor获取到任务信息,要开始准备启动worker了。
  1. syncProcesses.run(zkAssignment, downloadFailedTopologyIds);
  1. syncProcesses是com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent的引用变量,该类中有个自定义的run方法中有段代码如下,调用的startNewWorkers方法
  1. startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds);
  1. SyncProcessEvent的startNewWorkers方法有代码片段如下,主要是根据集群模式启动不同模式下的worker。我们跟踪分布式集群模式下的worker启动。
  1. for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) {
  2. if (clusterMode.equals(“distributed”)) {
  3. launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, assignment);
  4. } else if (clusterMode.equals(“local”)) {
  5. launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, workerThreadPids);
  6. }
  7. }
  1. 在分布式模式下worker启动最终会调用一个类似于java -server xxx.worker 启动worker。由于第7步中,有个for循环,该for循环会迭代出属于当前supervisor的所有worker任务并启动。
  1. JStormUtils.launchProcess(cmd, environment, true);
  1. java -server xxx.worker,命令执行之后,会执行Worker的mian方法。worker的main方法有代码如下,其实调用了worker自己内部的静态方法,叫做mk_worker方法。
  1. WorkerShutdown sd = mk_worker(conf, null, topology_id, supervisor_id, Integer.parseInt(port_str), worker_id, jar_path);
  2. sd.join();
  1. mk_worker静态方法,会执行以下代码,创建一个worker的实例,并立即执行execute方法。
  1. Worker w = new Worker(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
  2. return w.execute();
  1. execute方法会执行以下代码创建一个RefreshConnections 的实例。
  1. RefreshConnections refreshConn = makeRefreshConnections();
  1. makeRefreshConnections 方法会执行以下代码创建一个RefreshConnections 实例。
  1. RefreshConnections refresh_connections = new RefreshConnections(workerData);
  1. RefreshConnections 是继承了 RunnableCallback,该实例的会有一个run方法会被定时执行。run方法中有以下代码,其中createTasks(addedTasks)方法用来创建Task任务。
  1. shutdownTasks(removedTasks);
  2. createTasks(addedTasks);
  3. updateTasks(updatedTasks);
  1. createTasks方法有代码如下,循环启动属于该worker的Task任务,启动Task任务主要调用Task.mk_task(workerData, taskId);
  1. for (Integer taskId : tasks) {
  2. try {
  3. TaskShutdownDameon shutdown = Task.mk_task(workerData, taskId);
  4. workerData.addShutdownTask(shutdown);
  5. } catch (Exception e) {
  6. LOG.error(“Failed to create task-” + taskId, e);
  7. throw new RuntimeException(e);
  8. }
  9. }
  1. Task.mk_task(workerData, taskId)方法实现如下,创建一个Task对象并立即调用execute方法。
  1. Task t = new Task(workerData, taskId);
  2. return t.execute();
  1. execute方法实现如下,用来初始化一个Executor,我们知道在默认情况下一个task等于一个executor。
  1. RunnableCallback baseExecutor = prepareExecutor();
  1. 进入prepareExecutor()方法,代码如下,发现代码调用了mkExecutor方法。
  1. final BaseExecutors baseExecutor = mkExecutor();
  1. mkExecutor方法,代码如下,如果当前taskObj是Bolt就创建Bolt的executor,如果当前taskObj是Spout就创建相应的Spout executor。
  1. public BaseExecutors mkExecutor() {
  2. BaseExecutors baseExecutor = null;
  3. if (taskObj instanceof IBolt) {
  4. baseExecutor = new BoltExecutors(this);
  5. } else if (taskObj instanceof ISpout) {
  6. if (isSingleThread(stormConf) == true) {
  7. baseExecutor = new SingleThreadSpoutExecutors(this);
  8. } else {
  9. baseExecutor = new MultipleThreadSpoutExecutors(this);
  10. }
  11. }
  12. return baseExecutor;
  13. }
  1. 创建完了executor,现在有两条线,分别是bolt executor和spout executor。以bolt executor 为例,这个executor会实现Disruptor的EventHandler接口。 接口onevent方法需要实现,实现代码中会调用processTupleEvent()方法。下面节选onevent中的部分代码。
  1. if (event instanceof Tuple) {
  2. processControlEvent();
  3. processTupleEvent((Tuple) event);
  4. } else if (event instanceof BatchTuple) {
  5. for (Tuple tuple : ((BatchTuple) event).getTuples()) {
  6. processControlEvent();
  7. processTupleEvent((Tuple) tuple);
  8. }
  9. }
  1. 进入processTupleEvent方法,发现有代码如下,其实最终是调用了bolt.execute()方法。
  1. private void processTupleEvent(Tuple tuple) {
  2. try {
  3. if (xxx) {
  4. backpressureTrigger.handle(tuple);
  5. } else {
  6. bolt.execute(tuple);
  7. }
  8. } catch (Throwable e) {
  9. error = e;
  10. LOG.error(“bolt execute error “, e);
  11. report_error.report(e);
  12. }
  13. }

序列化

当topology发布的时候,所有的bolt和spout组件首先会进行序列化,然后通过网络发送到集群中.

如果spout或者bolt在序列化之前(比如说在构造函数中生成)实例化了任何无法序列化的实例变量,在进行序列化时会抛出NotSerializableException异常,topology就会部署失败