Storm是由BackType开发的一个实时、分布式的计算平台,后来Twitter收购了BackType并将其源代码开放。在GitHub上(https://github.com/nathanmarz/storm),我们可以获得Storm的最新源代码及相关文档。

1.1 Storm的总体结构

Storm中会涉及的术语包括Stream、Spout、Bolt、Worker、Executor、Task、Stream Grouping和Topology,现简要介绍如下。

  • Stream是被处理的数据。
  • Spout是数据源。
  • Bolt封装了数据处理逻辑。
  • Worker是工作进程。一个工作进程中可以含有一个或多个Executor线程。
  • Executor是运行Spout或Bolt处理逻辑的线程。
  • Task是Storm中的最小处理单元。一个Executor中可以包含一个或多个Task,消息的分发都是从一个Task到另一个Task进行的。
  • Stream Grouping定义了消息分发策略,定义了Bolt节点以何种方式接收数据。消息可以随机分配(Shuffle Grouping,随机分组),或者根据字段值分配(Fields Grouping,字段分组),或者广播(All Grouping,全部分组),或者总是发给同一个Task(Global Grouping,全局分组),也可以不关心数据是如何分组的(None Grouping,无分组),或者由自定义逻辑来决定,即由消息发送者决定应该由消息接收者组件的哪个Task来处理该消息(Direct Grouping,直接分组)。
  • Topology是由消息分组方式连接起来的Spout和Bolt节点网络,它定义了运算处理的拓扑结构,处理的是不断流动的消息。除非杀掉Topology,否则它将永远运行下去。

Storm的基本结构如图1-1所示。
Storm集群中存在两种类型的节点:运行Nimbus服务的主节点和运行Supervisor服务的工作节点。Storm集群由一个主节点和多个工作节点组成。主节点上运行一个名为“Nimbus”的守护进程,用于分配代码、布置任务及检测故障。每个工作节点则运行一个名为“Supervisor”的守护进程,用于监听工作、开始并终止工作进程。
image.png
图 1-1 Storm的基本结构
Nimbus和Supervisor都能快速失败并恢复,而且它们是无状态的,其元数据存储在ZooKeeper中,这使得系统具有很高的容错性。Nimbus与Supervisor之间的协调工作是通过ZooKeeper来完成的,它是Apache下面的开源项目,用于分布式系统的同步等,详情可参考http://zookeeper.apacheorg/
Worker由Supervisor负责启动,一个Worker中可以有多个Executor线程,每个Executor中又可包含一个或多个Task。Task为Storm中的最小处理单元,它是Topology组件诸多并行度中的一个。每个Executor都会启动一个消息循环线程,用以接收、处理和发送消息。当Executor收到属于其下某一Task的消息后,就会调用该Task对应的处理逻辑对消息进行处理。
在逻辑上,Storm中消息的来源节点被称为Spout,消息的处理节点被称为Bolt。在系统中,可以存在多个Spout及Bolt,且每个Spout或Bolt都可设置不同的并行度,示例如图1-2所示。
image.png
图 1-2 示例图

1.2 Storm的元数据

Storm采用ZooKeeper来存储Nimbus、Supervisor、Worker以及Executor之间共享的元数据,这些模块在重启之后,可以通过对应的元数据进行恢复。因此Storm的模块是无状态的,这是保证其可靠性及可扩展性的基础。了解元数据以及Storm如何使用这些元数据,有助于我们更好地理解Storm的设计。

1.2.1 元数据介绍

Storm在ZooKeeper中存储数据的目录结构如图1-3所示,这是一个根路径为/storm的树,树中的每一个节点代表ZooKeeper中的一个节点(znode),每一个叶子节点是Storm真正存储数据的地方。在图1-3中,从根节点到叶子节点的全路径代表了该数据在ZooKeeper中的存储路径,该路径可被用来写入或获取数据。
image.png
图 1-3 Storm在ZooKeeper中存储的数据
下面分别介绍ZooKeeper中每项数据的具体含义。

  • /storm/workerbeats//node-port:它存储由node和port指定的Worker的运行状态和一些统计信息,主要包括storm-id(也即topology-id)、当前Worker上所有Executor的统计信息(如发送的消息数目、接收的消息数目等)、当前Worker的启动时间以及最后一次更新这些信息的时间。在一个topology-id下面,可能有多个node-port节点。它的内容在运行过程中会被更新。
  • /storm/storms/:它存储Topology本身的信息,包括它的名字、启动时间、运行状态、要使用的Worker数目以及每个组件的并行度设置。它的内容在运行过程中是不变的。
  • /storm/assignments/:它存储了Nimbus为每个Topology分配的任务信息,包括该Topology在Nimbus机器本地的存储目录、被分配到的Supervisor机器到主机名的映射关系、每个Executor运行在哪个Worker上以及每个Executor的启动时间。该节点的数据在运行过程中会被更新。
  • /storm/supervisors/:它存储Supervisor机器本身的运行统计信息,主要包括最近一次更新时间、主机名、supervisor-id、已经使用的端口列表、所有的端口列表以及运行时间。该节点的数据在运行过程中也会被更新。
  • /storm/errors///e:它存储运行过程中每个组件上发生的错误信息。是一个递增的序列号,每一个组件最多只会保留最近的10条错误信息。它的内容在运行过程中是不变的(但是有可能被删除)。

    1.2.2 Storm怎么使用这些元数据

    了解了存储在ZooKeeper中的数据,我们自然想知道Storm是如何使用这些元数据的。例如,这些数据何时被写入、更新或删除,这些数据都是由哪种类型的节点(Nimbus、Supervisor、Worker或者Executor)来维护的。接下来,我们就简单介绍一下这些关系,希望读者能对Storm的整体设计实现有更深一层的认识。带上这些知识,能让你的Storm源码之路变得更加轻松愉快。
    首先来看一下总体交互图,如图1-4所示。
    image.png
    图 1-4 总体交互图
    这个图描述了Storm中每个节点跟ZooKeeper内元数据之间的读写依赖关系,详细介绍如下。
    1. Nimbus
    Nimbus既需要在ZooKeeper中创建元数据,也需要从ZooKeeper中获取元数据。下面简述图1-4中箭头1和箭头2的作用。

  • 箭头1表示由Nimbus创建的路径,包括:a. /storm/workerbeats/b. /storm/storms/c. /storm/assignments/其中对于路径a,Nimbus只会创建路径,不会设置数据(数据是由Worker设置的,后面会介绍);对于路径b和c,Nimbus在创建它们的时候就会设置数据。a和b只有在提交新Topology的时候才会创建,且b中的数据设置好后就不再变化,c则在第一次为该Topology进行任务分配的时候创建,若任务分配计划有变,Nimbus就会更新它的内容。

  • 箭头2表示Nimbus需要获取数据的路径,包括:a. /storm/workerbeats//node-portb. /storm/supervisors/c. /storm/errors///eNimbus需要从路径a读取当前已被分配的Worker的运行状态。根据该信息,Nimbus可以得知哪些Worker状态正常,哪些需要被重新调度,同时还会获取到该Worker所有Executor统计信息,这些信息会通过UI呈现给用户。从路径b可以获取当前集群中所有Supervisor的状态,通过这些信息可以得知哪些Supervisor上还有空闲的资源可用,哪些Supervisor则已经不再活跃,需要将分配到它的任务分配到其他节点上。从路径c上可以获取当前所有的错误信息并通过UI呈现给用户。集群中可以动态增减机器,机器的增减会引起ZooKeeper中元数据的变化,Nimbus通过不断获取这些元数据信息来调整任务分配,故Storm具有良好的可扩展性。当Nimbus死掉时,其他节点是可以继续工作的,但是不能提交新的Topology,也不能重新进行任务分配和负载调整,因此目前Nimbus还是存在单点的问题。

2. Supervisor
同Nimbus类似,Superviser也要通过ZooKeeper来创建和获取元数据。除此之外,Supervisor还通过监控指定的本地文件来检测由它启动的所有Worker的运行状态。下面简述图1-4中箭头3、箭头4和箭头9的作用。

  • 箭头3表示Supervisor在ZooKeeper中创建的路径是/storm/supervisors/。新节点加入时,会在该路径下创建一个节点。值得注意的是,该节点是一个临时节点(创建ZooKeeper节点的一种模式),即只要Supervisor与ZooKeeper的连接稳定存在,该节点就一直存在;一旦连接断开,该节点则会被自动删除。该目录下的节点列表代表了目前活跃的机器。这保证了Nimbus能及时得知当前集群中机器的状态,这是Nimbus可以进行任务分配的基础,也是Storm具有容错性以及可扩展性的基础。
  • 箭头4表示Supervisor需要获取数据的路径是/storm/assignments/。我们知道它是Nimbus写入的对Topology的任务分配信息,Supervisor从该路径可以获取到Nimbus分配给它的所有任务。Supervisor在本地保存上次的分配信息,对比这两部分信息可以得知分配信息是否有变化。若发生变化,则需要关闭被移除任务所对应的Worker,并启动新的Worker执行新分配的任务。Nimbus会尽量保持任务分配的稳定性,我们将在第7章中进行详细分析。
  • 箭头9表示Supervisor会从LocalState(相关内容会在第4章中介绍)中获取由它启动的所有Worker的心跳信息。Supervisor会每隔一段时间检查一次这些心跳信息,如果发现某个Worker在这段时间内没有更新心跳信息,表明该Worker当前的运行状态出了问题。这时Supervisor就会杀掉这个Worker,原本分配给这个Worker的任务也会被Nimbus重新分配。

3. Worker
Worker也需要利用ZooKeeper来创建和获取元数据,同时它还需要利用本地的文件来记录自己的心跳信息。
下面简述图4-1中箭头5、箭头6和箭头8的作用。

  • 箭头5表示Worker在ZooKeeper中创建的路径是/storm/workerbeats//node- port。在Worker启动时,将创建一个与其对应的节点,相当于对自身进行注册。需要注意的是,Nimbus在Topology被提交时只会创建路径/storm/workerbeats/,而不会设置数据,数据则留到Worker启动之后由Worker创建。这样安排的目的之一是为了避免多个Worker同时创建路径时所导致的冲突。
  • 箭头6表示Worker需要获取数据的路径是/storm/assignments/,Worker会从这些任务分配信息中取出分配给它的任务并执行。
  • 箭头8表示Worker在LocalState中保存心跳信息。LocalState实际上将这些信息保存在本地文件中,Worker用这些信息跟Supervisor保持心跳,每隔几秒钟需要更新一次心跳信息。Worker与Supervisor属于不同的进程,因而Storm采用本地文件的方式来传递心跳。

4. Executor
Executor只会利用ZooKeeper来记录自己的运行错误信息,下面简述图4-1中箭头7的作用。
箭头7表示Executor在ZooKeeper中创建的路径是/storm/errors///e。每个Executor会在运行过程中记录发生的错误。
5. 小结
从前面的描述中可以得知,Nimbus、Supervisor以及Worker两两之间都需要维持心跳信息,它们的心跳关系如下。

  • Nimbus和Supervisor之间通过/storm/supervisors/路径对应的数据进行心跳保持。Supervisor创建这个路径时采用的是临时节点模式,所以只要Supervisor死掉,对应路径的数据就会被删掉,Nimbus就会将原本分配给该Supervisor的任务重新分配。
  • Worker跟Nimbus之间通过/storm/workerbeats//node-port中的数据进行心跳保持。Nimbus会每隔一定时间获取该路径下的数据,同时Nimbus还会在它的内存中保存上一次的信息。如果发现某个Worker的心跳信息有一段时间没更新,就认为该Worker已经死掉了,Nimbus会对任务进行重新分配,将分配至该Worker的任务分配给其他Worker。
  • Worker跟Supervisor之间通过本地文件(基于LocalState)进行心跳保持