Concepts
此页面列出了Storm的主要概念以及可以找到更多信息的资源链接。讨论的概念是:
- Topologies
- Streams
- Spouts
- Bolts
- Stream groupings
- Reliability
- Tasks
- Workers
Topologies
实时应用程序的逻辑被打包到Storm拓扑中。Storm拓扑类似于MapReduce作业。一个关键的区别是MapReduce作业最终完成,而拓扑结构永远运行(当然,直到你杀死它)。拓扑是与流分组连接的喷口和螺栓的图形。这些概念如下所述。
Resources:
- TopologyBuilder: use this class to construct topologies in Java
- Running topologies on a production cluster
- Local mode: Read this to learn how to develop and test topologies in local mode.
Streams
流是Storm中的核心抽象。流是无限的元组序列,以分布式方式并行处理和创建。Streams是使用模式定义的,该模式命名流的元组中的字段。默认情况下,元组可以包含整数,长整数,短整数,字节,字符串,双精度数,浮点数,布尔值和字节数组。您还可以定义自己的序列化程序,以便可以在元组中本机使用自定义类型。
Resources:
- Tuple: streams are composed of tuples
- OutputFieldsDeclarer: used to declare streams and their schemas
- Serialization: Information about Storm’s dynamic typing of tuples and declaring custom serializations
Spouts
spout是拓扑中的流的来源。通常,spouts将从外部源读取元组并将它们发送到拓扑中(例如,Kestrel队列或Twitter API)。spouts可以是可靠的或不可靠的。如果一个元组无法被Storm处理,那么一个可靠的spout能够重放一个元组,而一个不可靠的spout一旦发出就会忘记元组。
Spouts可以发出多个流。为此,请使用OutputFieldsDeclarer的declareStream
方法声明多个流,SpoutOutputCollector上使用emit
方法时指定要发出的流。
spouts的主要方法是nextTuple
。nextTuple
要么在拓扑中发出新元组,要么在没有新元组发出时返回。nextTuple
的实现必须是非阻塞的,因为Storm会在同一个线程上调用所有spout方法。
spout上的其他主要方法是ack
和fail
。当Storm检测到从spout发出的元组通过拓扑成功完成或未能完成时,会调用这些方法。ack
和fail
仅被称为可靠的spout。有关更多信息,请参阅Javadoc。
Resources:
- IRichSpout: this is the interface that spouts must implement.
- Guaranteeing message processing
Bolts
拓扑中的所有处理都是用bolt完成的。Bolts可以执行任何操作,包括过滤,函数,聚合,连接,与数据库交谈等。
bolt可以进行简单的流转换。进行复杂的流转换通常需要多个步骤,因此需要多个bolt。例如,将推文流转换为趋势图像流至少需要两个步骤:一个bolt用于为每个图像执行转推滚动计数,以及一个或多个bolt用于流出前X位的图像(您可以执行此操作)使用三个bolt而不是两个以更可扩展的方式进行特定的流转换。
bolt可以发出多个流。为此,请使用OutputFieldsDeclarer的declareStream
方法声明多个流,并在OutputCollector上使用emit
方法时指定要发出的流。
声明bolt的输入流时,总是订阅另一个组件的特定流。如果要订阅另一个组件的所有流,则必须单独订阅每个组件。InputDeclarer具有语法糖,用于订阅在默认流ID上声明的流。假设declarer.shuffleGrouping(“1”)
订阅组件“1”上的默认流,并且等同于declarer.shuffleGrouping(“1”,DEFAULT_STREAM_ID)
。
bolt中的主要方法是execute
方法,它接收一个新的元组作为输入。Bolts使用OutputCollector对象发出新元组。对于他们处理的每个元组,Bolts必须在OutputCollector上调用ack
方法,以便Storm知道元组何时完成(并且最终可以确定它可以安全地确定原始的spout元组)。对于处理输入元组,基于该元组发出0或更多元组,然后对输入元组进行调整的常见情况,Storm提供了一个IBasicBolt接口,它自动执行acking。
它非常适合在bolt中启动新线程,以便于异步处理。OutputCollector是线程安全的,可以随时调用。
Resources:
- IRichBolt: this is general interface for bolts.
- IBasicBolt: this is a convenience interface for defining bolts that do filtering or simple functions.
- OutputCollector: bolts emit tuples to their output streams using an instance of this class
- Guaranteeing message processing
Stream groupings
定义拓扑的一部分是为每个blot指定它应该作为输入接收的流。流分组定义了如何在bolt的任务中对该流进行分区。
Storm中有八个内置流分组,您可以通过实现CustomStreamGrouping接口来实现自定义流分组:
- Shuffle grouping:元组随机分布在bolt的任务中,使得每个bolt都能保证获得相同数量的元组。
- Fields grouping: 该流由分组中指定的字段分区。例如,如果流按“user-id”字段分组,则具有相同“user-id”的元组将始终执行相同的任务,但具有不同“user-id”的元组可能会执行不同的任务。
- Partial Key grouping: 该流按分组中指定的字段进行分区,如Fields grouping,但在两个下游bolt之间进行负载平衡,这可在传入数据偏斜时提供更好的资源利用率。本论文对其工作原理及其提供的优势进行了很好的解释。
- All grouping:该流将在所有bolt任务中复制。小心使用此分组。
- Global grouping:整个流程转到了一个bolt的任务中。具体来说,它转到id最低的任务。(具体可以参见GlobalGrouping的实现,其先对taskIdList进行正向排序,然后获取排序后的第一个元素,因此总是选择的是id最小的。)
- None grouping: This grouping specifies that you don’t care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
- Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the emitDirect methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the
emit
method in OutputCollector (which returns the task ids that the tuple was sent to). - Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
Resources:
- TopologyBuilder: use this class to define topologies
- InputDeclarer: this object is returned whenever
setBolt
is called onTopologyBuilder
and is used for declaring a bolt’s input streams and how those streams should be grouped
Reliability
Storm保证每个spout元组都将由拓扑完全处理( fully processed )。它通过跟踪每个spout元组触发的元组树(tuple tree)并确定元组树何时成功完成来实现此目的。每个拓扑都有一个与其关联的“消息超时”。如果Storm未能检测到该超时内已完成一个spout元组,则它会使元组失败并在以后重播。
要利用Storm的可靠性功能,必须在创建元组树中的新边时告诉Storm,并在完成处理单个元组时告诉Storm。这些是使用用于发出元组的 OutputCollector对象来完成的。锚定是在emit
方法中完成的,你声明你已经使用ack方法完成了一个元组。
This is all explained in much more detail in Guaranteeing message processing.
Tasks
每个spout或bolt在整个群集中执行尽可能多的任务。每个任务(task)对应一个执行线程,流分组定义如何将元组从一组任务发送到另一组任务。您可以在TopologyBuilder的setSpout和setBolt方法中为每个spout或bolt设置并行度。
Workers
拓扑在一个或多个工作进程中执行。每个工作进程都是物理JVM,并执行拓扑的所有任务的子集。例如,如果拓扑的组合并行度为300且分配了50个worker,则每个worker将执行6个任务(作为worker进程中的线程)。Storm试图在所有worker之间平均分配任务。
Resources:
- Config.TOPOLOGY_WORKERS: this config sets the number of workers to allocate for executing the topology
Performance Tuning(性能调优)
Refer to performance tuning guide.