教程

在本教程中,您将学习如何创建Storm拓扑并将其部署到Storm集群。Java将是使用的主要语言,但是一些示例将使用Python来说明Storm的多语言功能。

Storm集群的组件

Storm集群在表面上类似于Hadoop集群。而在Hadoop上运行“MapReduce作业”,在Storm上运行“拓扑”。“作业”和“拓扑”本身是非常不同的 - 一个关键的区别是MapReduce作业最终完成,而拓扑处理消息永远(或直到你杀死它)。

Storm集群上有两种节点:主节点和工作节点。主节点运行一个名为“Nimbus”的守护进程,类似于Hadoop的“JobTracker”。Nimbus负责在集群周围分发代码,为机器分配任务以及监控故障。

每个工作节点都运行一个名为“Supervisor”的守护进程。主管监听分配给其机器的工作,并根据Nimbus为其分配的内容,根据需要启动和停止工作进程。每个工作进程都执行拓扑的子集;运行拓扑由分布在许多计算机上的许多工作进程组成。

Tutorial - 图1

Nimbus和Supervisors之间的所有协调都是通过Zookeeper集群完成的。此外,Nimbus守护程序和Supervisor守护程序是快速失败和无状态的;所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以kill-9 Nimbus或者主管,他们会像没事一样重新开始。这种设计使Storm集群非常稳定。

Topologies

要在Storm上进行实时计算,您可以创建所谓的“拓扑”。拓扑是计算图。拓扑中的每个节点都包含处理逻辑,节点之间的链接指示数据应如何在节点之间传递。

运行拓扑很简单。首先,将所有代码和依赖项打包到一个jar中。然后,运行如下命令:

  1. storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2

这将使用参数arg1和arg2运行类org.apache.storm.MyTopology。该类的主要功能定义拓扑并将其提交给Nimbus。storm jar部分负责连接到Nimbus并上传jar。

由于拓扑定义只是Thrift结构,而Nimbus是Thrift服务,因此您可以使用任何编程语言创建和提交拓扑。上面的示例是从基于JVM的语言中执行此操作的最简单方法。有关启动和停止拓扑的详细信息,请参阅在生产群集上运行拓扑。

Streams

Storm中的核心抽象是“流”。流是一个无限的元组序列。Storm提供了以分布式和可靠的方式将流转换为新流的原语。例如,您可以将推文流转换为趋势主题流。

Storm为进行流转换提供的基本原语是“spouts”和“bolt”。Spout和bolt具有您实现的接口,用于运行特定于应用程序的逻辑。

Spout是流的来源。例如,spout可以读取Kestrel队列中的元组并将其作为流发出。或者spout可以连接到Twitter API并发出推文流。

Bolt会消耗任意数量的输入流,进行一些处理,并可能发出新的流。复杂的流转换,例如从推文流计算趋势主题流,需要多个步骤,因此需要多个bolt。Bolts可以执行任何操作,包括运行函数,过滤元组,进行流聚合,进行流连接,与数据库对话等等。

spout和bolt网络被打包成一个“拓扑”,这是您提交给Storm集群执行的顶级抽象。拓扑是流转换的图形,其中每个节点都是一个spout或bolt。图中的边缘表示哪些bolt订阅了哪些流。当一个spout或bolt向一个流发出一个元组时,它会将元组发送给订阅该流的每个bolt。

Tutorial - 图2

拓扑中节点之间的链接指示应如何传递元组。例如,如果Spout A和Bolt B之间有链接,从Spout A到Bolt C的链接,以及从Bolt B到Bolt C的链接,那么每次Spout A发出一个元组时,它都会将元组发送给BoltB和Bolt C.所有Bolt B的输出元组也将转向Bolt C。

Storm拓扑中的每个节点并行执行。在拓扑中,您可以为每个节点指定所需的并行度,然后Storm将在集群中生成该数量的线程以执行。

拓扑运行永远,或直到你杀死它。Storm会自动重新分配任何失败的任务。此外,Storm保证不会丢失数据,即使计算机出现故障并且消息丢失也是如此。

Data model

Storm使用元组作为其数据模型。元组是一个命名的值列表,元组中的字段可以是任何类型的对象。开箱即用,Storm支持所有原始类型,字符串和字节数组作为元组字段值。要使用其他类型的对象,您只需要为该类型实现一个序列化程序。

拓扑中的每个节点都必须声明它发出的元组的输出字段。例如,这个bolt声明它发出2元组,字段为“double”和“triple”:

  1. public class DoubleAndTripleBolt extends BaseRichBolt {
  2. private OutputCollectorBase _collector;
  3. @Override
  4. public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
  5. _collector = collector;
  6. }
  7. @Override
  8. public void execute(Tuple input) {
  9. int val = input.getInteger(0);
  10. _collector.emit(input, new Values(val*2, val*3));
  11. _collector.ack(input);
  12. }
  13. @Override
  14. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  15. declarer.declare(new Fields("double", "triple"));
  16. }
  17. }

declareOutputFields函数声明组件的输出字段[“double”,“triple”]。其余的bolt将在后面的章节中解释。

A simple topology

让我们看一个简单的拓扑,更多地探索概念,看看代码是如何形成的。让我们看一下来自storm-starter的ExclamationTopology定义:

  1. TopologyBuilder builder = new TopologyBuilder();
  2. builder.setSpout("words", new TestWordSpout(), 10);
  3. builder.setBolt("exclaim1", new ExclamationBolt(), 3)
  4. .shuffleGrouping("words");
  5. builder.setBolt("exclaim2", new ExclamationBolt(), 2)
  6. .shuffleGrouping("exclaim1");

此拓扑包含一个spout和两个bolt。spout发出单词,每个bolt针对它收到的内容,附加字符串“!!!”。节点排成一行:spout喷射到第一个bolt,然后发射到第二个bolt。如果喷口发出元组[“bob”]和[“john”],那么第二个螺栓将发出单词[“bob !!!!!!”]和[“john !!!!!!”]。

此代码使用setSpoutsetBolt方法定义节点。这些方法将用户指定的id,包含处理逻辑的对象以及节点所需的并行数量作为输入。在这个例子中,spout被赋予id“words”,bolt被赋予id“exclaim1”和“exclaim2”。

包含处理逻辑的对象为spouts实现IRichSpout接口,为bolt实现IRichBolt接口。

最后一个参数是节点所需的并行度,是可选的。它指示应在群集中执行该组件的线程数。如果省略它,Storm将只为该节点分配一个线程。

setBolt返回一个InputDeclarer对象,用于定义Bolt的输入。这里,组件“exclaim1”声明它想要使用shuffle分组读取组件“words”发出的所有元组,组件“exclaim2”声明它想要使用shuffle分组读取组件“exclaim1”发出的所有元组。“shuffle grouping”意味着元组应该从输入任务随机分配到bolt的任务中。有许多方法可以在组件之间对数据进行分组。这些将在几个部分中解释。

如果你想要组件“exclaim2”来读取组件“words”和组件“exclaim1”发出的所有元组(即:一个bolt有多个数据开源),你可以像这样编写组件“exclaim2”的定义:

  1. builder.setBolt("exclaim2", new ExclamationBolt(), 5)
  2. .shuffleGrouping("words")
  3. .shuffleGrouping("exclaim1");

如您所见,可以链接输入声明以指定Bolt的多个源。

让我们深入研究这种拓扑中的spout和bolt的实现。Spouts负责向拓扑中发送新消息。此拓扑中的TestWordSpout从列表[“nathan”,“mike”,“jackson”,“golda”,“bertels”]中每隔100ms发出一个随机单词。TestWordSpout中的nextTuple()实现如下所示:

  1. public void nextTuple() {
  2. Utils.sleep(100);
  3. final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
  4. final Random rand = new Random();
  5. final String word = words[rand.nextInt(words.length)];
  6. _collector.emit(new Values(word));
  7. }

如您所见,实现非常简单

ExclamationBolt 会对它的输入字符串 后面追加“!!!”,让我们看一下该类的完整实现:

  1. public static class ExclamationBolt implements IRichBolt {
  2. OutputCollector _collector;
  3. @Override
  4. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  5. _collector = collector;
  6. }
  7. @Override
  8. public void execute(Tuple tuple) {
  9. _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
  10. _collector.ack(tuple);
  11. }
  12. @Override
  13. public void cleanup() {
  14. }
  15. @Override
  16. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  17. declarer.declare(new Fields("word"));
  18. }
  19. @Override
  20. public Map<String, Object> getComponentConfiguration() {
  21. return null;
  22. }
  23. }

prepare方法为bolt提供了一个OutputCollector,用于从该bolt发送元组。元组可以随时从bolt中发出 - 在prepare,execute或cleanup方法中,甚至在另一个线程中异步发出。这个prepare实现只是将OutputCollector保存为稍后在execute方法中使用的实例变量。

execute方法从一个bolt的输入接收一个元组。ExclamationBolt从元组中获取第一个字段并发出一个带有字符串“!!!”的新元组附加到它的后面。如果您实现了一个订阅多个输入源的bolt,您可以使用Tuple#getSourceComponent方法找出Tuple来自哪个组件。

在execute方法中还有一些其他的事情,即输入元组作为第一个参数传递给emit,输入元组在最后一行上被ack。这些是Storm的可靠性API的一部分,用于保证不会丢失数据,本教程稍后将对此进行说明。

当Bolt被关闭时调用cleanup方法,并且应该清除所有打开的资源。无法保证在集群上调用此方法:例如,如果任务正在运行的计算机本崩溃,则无法调用该方法。清理方法适用于在本地模式下运行拓扑(在进程中模拟Storm集群),并且您希望能够运行并终止许多拓扑而不会遭受任何资源泄漏。

declareOutputFields方法声明ExclamationBolt发出一个带有一个名为“word”的字段的1元组。

getComponentConfiguration方法允许您配置此组件运行方式的各个方面。这是一个更高级的主题,将在Configuration上进一步说明。

在bolt实现中通常不需要cleanup和getComponentConfiguration等方法。您可以通过使用在适当的地方提供默认实现的基类来更简洁地定义螺栓。通过扩展BaseRichBolt可以更简洁地编写ExclamationBolt,如下所示:

  1. public static class ExclamationBolt extends BaseRichBolt {
  2. OutputCollector _collector;
  3. @Override
  4. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  5. _collector = collector;
  6. }
  7. @Override
  8. public void execute(Tuple tuple) {
  9. _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
  10. _collector.ack(tuple);
  11. }
  12. @Override
  13. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  14. declarer.declare(new Fields("word"));
  15. }
  16. }

Running ExclamationTopology in local mode

让我们看看如何在本地模式下运行ExclamationTopology,看看它是否正常工作。

Storm有两种操作模式:本地模式和分布式模式。在本地模式下,Storm通过使用线程模拟工作节点(worker nodes)来完全执行。本地模式对于拓扑的测试和开发很有用。您可以在本地模式下阅读有关在本地模式下运行拓扑的更多信息。

要以本地模式运行拓扑,请运行命令storm local而不是storm jar。

Stream groupings

流分组告诉拓扑如何在两个组件之间发送元组。请记住,spouts和bolt在集群中并行执行任意数量的任务。如果您查看拓扑在task级别的执行情况,它看起来像这样:

Tutorial - 图3

当一个Bolt A的任务向Bolt B发出一个元组时,它应该将该元组发送给哪个任务(task)?

“流分组”通过告诉Storm如何在多组任务之间发送元组来回答这个问题。在我们深入研究不同类型的流分组之前,让我们先来看看’’storm-starter’’中的另一种拓扑结构。这个WordCountTopology读出一个spout的句子,并从WordCountBolt流出它之前看过该单词的总次数:

  1. TopologyBuilder builder = new TopologyBuilder();
  2. builder.setSpout("sentences", new RandomSentenceSpout(), 5);
  3. builder.setBolt("split", new SplitSentence(), 8)
  4. .shuffleGrouping("sentences");
  5. builder.setBolt("count", new WordCount(), 12)
  6. .fieldsGrouping("split", new Fields("word"));

SplitSentence为它接收的每个句子中的每个单词发出一个元组,而WordCount在一个单词到数字的内存中保存一个映射。每次WordCount收到一个单词时,它都会更新其状态并发出新的单词计数。

有几种不同的流分组。

最简单的分组称为“shuffle grouping”,它将元组发送到随机任务。在WordCountTopology中使用随机分组将RandomSentenceSpout中的元组发送到SplitSentence bolt。它具有在所有SplitSentence bolt任务中均匀分配处理元组的工作的效果。

一种更有趣的分组是“fields grouping”。在SplitSentence bolt和WordCount bolt之间使用字段分组。对于WordCount bolt的功能而言,同一个词始终被发送到同一个task进行执行至关重要。否则,多个task将看到相同的单词,并且每个任务都会为计数发出不正确的值,因为每个任务都有不完整的信息。字段分组允许您按字段的子集对流进行分组。这会导致该字段子集的相同值转到同一任务。由于WordCount使用“word”字段上的字段分组来订阅SplitSentence的输出流,因此相同的单词始终会转到同一个任务,并且bolt会生成正确的输出。

fields grouping”是实现流连接和流聚合以及大量其他用例的基础。在引擎盖下,“fields grouping”是使用mod散列实现的。

还有一些其他类型的流分组。您可以在Concepts上阅读有关它们的更多信息。

Guaranteeing message processing

在本教程的前面部分,我们跳过了如何发出元组的几个方面。这些方面是Storm的可靠性API的一部分:Storm如何保证从喷口流出的每条消息都将得到完全处理。请参阅保证消息处理,以获取有关其工作原理的信息以及您作为用户必须执行的操作以利用Storm的可靠性功能。

Trident

Storm保证每条消息至少会通过拓扑播放一次。一个常见的问题是“你怎么做像在暴风雨之上点数那样的事情?难道你不会超额计算?”Storm有一个名为Trudent的更高级别的API,它允许您为大多数计算实现一次一次的消息传递语义。在这里阅读更多关于三叉戟

Distributed RPC

本教程展示了如何在Storm之上进行基本的流处理。使用Storm的原语可以做更多的事情。Storm最有趣的应用之一是Distributed RPC,您可以在其中并行化强大功能的计算。在此处阅读有关Distributed RPC的更多信息。

Conclusion

本教程概述了开发,测试和部署Storm拓扑。其余的文档深入探讨了使用Storm的所有方面。