- 平台工程
- Jillur Quddus
- 2017年6月7日
如何构建数据管道以实时处理基于事件的数据
2012年,我第一次听到“Hadoop”和“大数据”这两个词。当时,这两个词几乎是彼此的同义词 - 我经常参加会议,客户想要一个’大数据’解决方案只是因为它已经成为最新的流行词,很少或根本不考虑他们的要求和数据实际上保证一个。“大数据”当然不仅仅是Hadoop,随着批量和实时的可扩展技术变得更加成熟,我们对它们的了解也越来越多。但是,对于那些现在迈出第一步的人来说,就像我在2012年回到“大数据”技术的世界一样,你可能会问自己一个类似于我所问的问题 - 即,
仅举几个你可能听说过的文章,然后磕磕绊绊地看到这篇文章(没有特别的顺序,就像我记得的那样):Apache Hadoop,Apache Hive,Apache Pig,Apache HBase,Apache Spark,Apache Storm,Apache Kafka,Apache Flume,Apache Cassandra,MongoDB,Redis,Oracle Berkeley,Akka,Spray(由Akka HTTP取代),Apache TinkerPop,Apache Giraph,Apache Mahout,阿帕奇的ZooKeeper,Couchbase,阿帕奇弗林克,弹性搜索,Elassandra,SOLR,Voldermort(是的,你没看错),MemcacheDB,DynamoDB …..等等。
在接下来的几周和几个月中,我希望在一系列知识库文章中写下这些技术中的每一种,重点关注您可能决定使用它们的实际情况,最重要的是使用动手实例。作为本系列的第一篇文章,我想通过介绍一些典型的实时数据处理场景,同时介绍一些非常重要的流技术,即Apache,将其献给那些迈向大数据技术世界的人。Flume,Apache Kafka和Apache Spark。
目标
我们在本文中的目标是创建一个高吞吐量,可扩展,可靠且容错的数据管道,用于获取基于事件的数据并将这些事件流式传输到Apache Spark,后者将解析其内容,所有这些都将在接近实时。(在我的下一篇文章中,我将讨论如何使用我们的实时数据管道来执行分析并在Apache Spark中使用此数据流构建预测模型!)
实时流数据流的“Hello World”
我在下面描述的示例将使用推文作为其实时数据源。Twitter是一个很好的例子,因为它是免费的,它可以实时生成大量数据,Twitter用户可以设置自己的应用程序来访问Twitter提供的推文流。因此,在继续阅读本文之前,请确保您拥有Twitter帐户。设置完Twitter帐户后,请转到Twitter的“ 应用程序管理”页面,创建并注册您的应用程序。假设您已正确完成此操作,您将获得以后需要的以下身份验证信息: - 消费者密钥/ API密钥(例如A1b2C3DEfGH4IjkL5mnopq678) - 消费者秘密/ API秘密(例如Zy1XW2vUtS3R4QPON5mLK6JiHG7FeD8CBaAbC9876DEf543Ghi) - 访问令牌(例如1234567890-5mLK6JiHG7FeD8CBaAbC9876DEf543GA77888dH) - 访问令牌秘密(例如aAbC9876DEf543GA77888dHD8CBaAbC9876D5mLK6JiHG7) Apache Flume
因此,让我们从相对较老的技术之一Apache Flume开始。正如其网站上所述,“ Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有强大的容错性和容错性具有可调的可靠性机制和许多故障转移和恢复机制。它使用简单的可扩展数据模型,允许在线分析应用程序“那么这一切意味着什么呢?实际上,Flume允许我们有效地收集(摄取),聚合并将来自多个来源(例如Twitter的推文流或日志数据)的大量流数据移动到Hadoop中( Flume紧密集成在哪里我们可以使用Hadoop的分布式文件系统(HDFS)来存储它。我们可以从中分析它.Tlume的一个常见用例是充当数据管道,将简单的基于事件的数据摄入Hadoop和HBase一样,但它也支持开箱即用的其他技术和集中式数据存储。让我们通过研究其核心组件来进一步分解: - 事件 - 由Flume处理的数据事件本身,例如推文或日志文件中的条目。 - 来源 - 数据进入Flume的事件数据的消费者。对于推文,这可能是将您的应用程序链接到Twitter流的API或连接器,例如org.apache.flume.source.twitter.TwitterSource。对于日志条目,这可能是日志框架,例如log4j。 - 通道 - 一个被动存储器,充当源与其事件数据的消费者之间的管道(即,接收器 - “下沉通道”)。保持事件直到被Flume Sink消耗的公共通道包括本地文件系统(文件通道)或存储器(存储器通道)。事件数据可以在Flume Sink“排空”之前存储在一个或多个通道中。 - 接收器 - 从通道中使用/移除事件数据并将其传送到目标的机制。常见的接收器是HDFS接收器,它将事件数据保存到HDFS。 - 代理 - Flume实例,即在JVM中运行的源,通道和接收器的集合。 Flume的一大优点是Sinks可以将事件数据转发到另一个Flume Agent的Flume Source,即代理可以链接在一起形成复杂的数据流。同一代理程序中的源和接收器与存储在通道中的事件异步运行。此外,Flume通过分别在发送和接收代理上启动单独的事务,保证从一个Flume代理向下一个发送消息。最后,Flume可以轻松水平缩放,因为没有中央协调器节点,Flume Agent彼此独立运行,没有单点故障。所有这一切使Apache Flume成为高吞吐量实时事件数据流的强大服务,为大数据系统提供支持。
我们现在准备配置和部署我们的第一个Flume Agent!我们的第一个Flume Agent将通过您在上面创建的Twitter应用程序从Twitter Stream获取推文,使用与Flume开箱即用捆绑的演示Flume Twitter Source类。这个演示Twitter Source连接到Twitter Stream并不断下载推文样本,将它们转换为Avro格式并将这些Avro事件发送到我们的Flume Sink。我们将使用一个内存通道,记录器接收器将使用它来输出推文并将它们输出到控制台。请注意,我使用的是CentOS 7 Minimal Installation Server来执行以下命令,但Flume应该与其他Linux发行版同样有效。
bash
# Unpack the Flume Binary to a directory of choice
tar -xzf apache-flume-1.7.0-bin.tar.gz
# Create a new Flume Configuration File to configure our Flume Twitter Agent
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = Logger
# Source Configuration - Inbuilt TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>
# Channel Configuration - Memory Channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
# Sink Configuration - Logger Sink
TwitterAgent.sinks.Logger.type = logger
TwitterAgent.sinks.Logger.channel = MemChannel
# Launch the Flume Agent to sink to the Console
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf -Dflume.root.logger=DEBUG,console
假设一切顺利,记录器接收器应该以接近实时的方式将推文以JSON格式输出到控制台,如下所示(因为这是英国的选举时间,我使用了选举主题关键字,如ge2017,选举等等) ):
目前我们已经设置了Logger Sink,并使用-Dflume.root.logger = DEBUG,console在我们的启动命令中指示记录器输出到控制台。在conf文件夹中,您将找到log4j.properties以根据需要更新记录器属性。默认情况下,它将记录到Flume主目录内的logs目录和名为flume.log的文件。要写入HDFS,我们需要做的就是按照以下方式修改我们的接收器:
bash
# Update the Sink to write to your HDFS
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
...
# Sink Configuration - HDFS Sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.channel = MemChannel
# Ensure that the owner of the Flume Agent has permission to write to this HDFS Directory
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:9000/user/flume/twitter/data/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
# Because we have used %Y/%m/%d/%H formatting in our HDFS Path
# we must specify how Flume should get the timestamp of the tweet.
# To use the timestamp of the destination i.e. the HDFS Sink, we can use
# TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true
# Alternatively, to use the timestamp of the actual event (tweet), we can use Source interceptors
TwitterAgent.sources.Twitter.interceptors = interceptor1
TwitterAgent.sources.Twitter.interceptors.interceptor1.type = timestamp
# Ensure that the Hadoop Libraries are on the Flume Classpath so that Flume knows how to access and write to your HDFS
cp conf/flume-env.sh.template conf/flume-env.sh
vi conf/flume-env.sh
# Update the Flume Classpath to point to the correct directories on your system
# Alternatively, copy the Hadoop Common and HDFS libraries to the the Flume lib folder
FLUME_CLASSPATH="$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/hdfs/*"
# Launch the Flume Agent to sink to the HDFS
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf
请注意,我们不再需要在我们的启动命令中显式定义Root Logger作为Flume,并自动从conf目录中读取log4j.properties和flume-env.sh。如果一切顺利,推文将在您上面指定的路径上以近乎实时的方式写入您的HDFS。
要了解有关Apache Flume的更多信息,包括创建复杂的链式数据流及其他源,通道和接收器类型,请参阅Apache Flume文档。
阿帕奇卡夫卡
到目前为止,我们构建了一个数据管道,用于将简单的基于事件的数据(即推文)提取到我们的系统中,写入日志文件,控制台或HDFS。我们的目标是让他们使用Apache Spark,以便我们可以近乎实时地解析它们,并最终在Spark中构建预测模型(我将在下一篇文章中介绍)。我们将在实时数据管道中讨论的下一项技术是Apache Kafka。
正如其网站上所述“ Apache Kafka是一个分布式流媒体平台 ”。那么为什么我们需要在数据管道中使用它呢?好吧,当您需要构建实时流数据流水线时,Kafka非常适合,就像我们在本文中一样,它以可靠,可扩展和容错的方式在系统之间获取数据。在尝试将Apache Kafka集成到我们的数据管道之前,让我们探讨一下Apache Kafka背后的一些核心概念。官方的Apache Kafka文档是对Apache Kafka的一个很好的介绍,所以我强烈建议您在第一个实例中阅读它。对于那些已经熟悉发布 - 订阅消息传递系统的人来说,您可能对某些概念很熟悉,但我建议您先阅读官方文档。 - 消息 - 消息是可以存储对象的字节数组,例如String和JSON对象。 - 主题 - 主题本质上是消息的类别。您可能有一个主题来存储我们的推文。您可能有另一个主题来存储警报。Kafka在这些主题中存储记录流。由于Kafka是一个分布式系统,因此主题在Kafka集群中的众多节点之间进行分区。还可以跨群集中的节点复制主题。Kafka中的主题本身由分区组成,分区是附加到的有序且不可变的记录序列,正是这些分区允许主题分布在Kafka集群中的节点上,因为主题可以有多个分区。单独的分区完全适合其服务节点。 - 生产者 - 生产者向主题编写/发布数据。生产者选择将哪个记录分配给主题中的哪个分区。通过将密钥附加到消息,生产者可以保证同一密钥中的所有消息将发布到主题中的同一分区。Kafka保证生产者发送到特定主题分区的消息将按照发送顺序附加。 - 消费者 - 消费者从主题阅读(即消费)。主题可以有零个,一个或多个订阅它的消费者,称为多订户主题(在Kafka中总是如此)。当从主题消费时,消费者组可以配置多个消费者,并且发布到主题的每个记录被递送到每个订阅消费者组内的一个消费者,具有相同密钥的所有消息到达相同的消费者。如果所有使用者属于同一个使用者组,则记录将在Consumer实例上有效地进行负载平衡,因为每个Consumer实例将从他们订阅的每个主题中的唯一分区子集中读取消息。如果所有消费者属于不同的消费者组,则每条记录将广播到所有消费者实例。 Kafka的伟大之处在于它是一个分布式,可靠且容错的流媒体平台。对于具有复制因子N的主题,可以在不丢失任何记录的情况下发生N-1服务器故障。此外,由于每个主题都是附加的有序记录序列,因此分区中的每条消息都会分配一个唯一的偏移量—Kafka不记录哪些消息已被哪些消费者读取,以便仅保留未读消息,而是保留所有消息消息可配置的时间量,无论它们是否已被消耗。因此,Kafka可以轻松处理大量的Consumer实例,因为消费者自己可以跟踪他们的读取位置,并且可以很好地存储大量数据,具有低延迟,高性能和增加的复制。
除了作为传统消息代理的替代品之外,Kafka还允许我们构建低延迟数据管道,因为我们可以开发消费者以可靠和容错的方式订阅基于事件的实时数据,这是特别有用的对于关键的实时数据,必须保证数据的交付以用于集成和向前的目的。虽然我们实时处理推文的例子并不十分重要,但现在将Kafka部署到我们的实时数据管道中可以教会我们在为客户实施任务关键型实时生产系统时的宝贵经验。
卡夫卡频道
那么我们如何将Apache Kafka集成到我们刚刚起步的数据管道中呢?那么,我们将实现以下设计: - 制片人 - 我们将使用Flume Twitter Source收集并直接将我们的推文发布到Kafka频道,无需额外的缓冲 - 主题 - 我们将配置一个主题来存储我们在Kafka的推文 - 消费者 - 我们将使用Spark Streaming API及其与Kafka的集成来使用Kafka Channel中的消息 在我们继续之前,让我们花点时间讨论一下这种方法的优缺点。正如我之前所描述的,Flume Channels是位于Flume Sources和Flume Sinks之间的缓冲区,允许Sources收集数据而不用担心Sinks,它们可能以不同的速率运行。此外,使用事务来实现对通道的写入和读取 - 仅在提交事务之后,该事务中的一批事件才可用于接收器。Flume支持开箱即用的各种频道: - 内存通道 - 事件存储在内存中的队列中。使用内存通道的优点是它支持非常高的吞吐量,因为所有数据都保存在内存中。但是,对于必须保证交付的关键任务事件,不应使用它,因为数据不会持久存在。因此,如果代理失败,您可能会丢失数据。 - 文件通道 - 文件通道将所有事件写入磁盘并设计为高度并发 - 它可以同时处理多个源和接收器。如果无法容忍数据丢失并且数据持久性和可恢复性很重要,则文件通道可能会以性能为代价。 其他开箱即用的通道包括JDBC通道(事件持久保存到数据库)和可溢出内存通道(事件存储在内存和磁盘中)。Flume还支持Kafka频道,其中事件存储在Kafka集群中,提供高可用性,可靠性和复制。在我们的例子中,直接从我们的Flume Twitter Source使用Kafka频道的优势在于不需要额外的缓冲,它增加了我们数据管道的可靠性,并且通过使用没有显式接收器的源拦截器,它允许我们编写将事件转换为Kafka主题以供其他应用程序使用,例如我们最终的Spark Streaming Application。根据您的要求,您可以设计不同的Flume代理和数据管道。例如,如果要求高性能且数据不是任务关键型,
现在我们已经介绍了一些理论,让我们开始实现其余的数据管道。和以前一样,我将使用一个CentOS 7 Minimal Installation服务器来执行以下命令。在生产环境中,您将拥有一个多节点集群,但出于简单的开发目的,我将使用单节点集群。请注意,ZooKeeper超出了本文的范围,但我将在以后再回到它。现在,只需将ZooKeeper视为配置的集中服务(例如,从中央源引导集群配置)和分布式集群管理(例如实时节点状态) - 即Kafka使用ZooKeeper来帮助形成其生产者集群,消费者和代理节点。
bash
# Unpack the Kafka Binary to a directory of choice
tar -xzf kafka_2.12-0.10.2.1.tgz
# Basic Configuration of the internal ZooKeeper Service
vi config/zookeeper.properties
# Absolute path to the ZooKeeper Data Directory and Client Port
dataDir=<ZooKeeper Data Directory>
clientPort=2181
# Basic Configuration of the Kafka Server
vi config/server.properties
# Kafka Server Log Directory and ZooKeeper Co-ordinator Service
log.dirs=<Kafka Log Directory>
zookeeper.connect=<Hostname>:2181
# Start the internal ZooKeeper Service
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# Start the Kafka Server
bin/kafka-server-start.sh config/server.properties
# Create the Twitter Topic for our Tweets
bin/kafka-topics.sh --create --zookeeper <ZooKeeper Hostname>:2181 --replication-factor 1 --partitions 2 --topic twitter
Created topic "twitter".
现在Kafka已经启动并且我们已经创建了我们的Twitter主题,我们需要更新我们的Flume代理以写入Kafka频道。目前,我们将Flume Logger Sink输出到控制台,以便我们确认我们新设计的Flume Agent仍在工作。在本文的下一部分中,我们将讨论Apache Spark以及如何配置它以从Kafka Channel读取。
bash
# Update the Flume Agent to act as the Kafka Producer and Consumer
cd $FLUME_HOME
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = Kafka
TwitterAgent.sinks = Logger
# Source Configuration - Inbuilt TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = Kafka
TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>
TwitterAgent.sources.Twitter.interceptors = interceptor1
TwitterAgent.sources.Twitter.interceptors.interceptor1.type = timestamp
# Channel Configuration - Kafka Channel
TwitterAgent.channels.Kafka.type = org.apache.flume.channel.kafka.KafkaChannel
TwitterAgent.channels.Kafka.capacity = 10000
TwitterAgent.channels.Kafka.transactionCapacity = 100
TwitterAgent.channels.Kafka.brokerList = <Kafka Broker Hostname>:9092
TwitterAgent.channels.Kafka.topic = twitter
TwitterAgent.channels.Kafka.zookeeperConnect = <ZooKeeper Hostname>:2181
TwitterAgent.channels.Kafka.parseAsFlumeEvent = true
# Sink Configuration - Logger Sink
TwitterAgent.sinks.Logger.type = logger
TwitterAgent.sinks.Logger.channel = Kafka
# Launch the Flume Agent using the Kafka Channel and sinking to the Console
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf -Dflume.root.logger=DEBUG,console
巴什复制当您运行上面的最终命令时,您的控制台将生成大量输出,包括以前的推文。如果您想明确检查Kafka是否正在接收来自Flume Producer的消息,我们可以通过命令行运行Kafka Consumer来订阅Twitter主题并从头开始显示所有消息。
bash
# Run a Kafka Consumer subscribed to the Twitter Topic
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --zookeeper <ZooKeeper Hostname>:2181 --topic twitter --from-beginning
Apache Spark我们现在已经设置了一个Flume Agent来使用我们的Twitter应用程序收集推文,将这些推文发布到Kafka主题,最后使用来自Kafka频道的推文并将它们下载到Logger或HDFS。本文的最后一部分将介绍如何使用Spark Streaming API将Apache Spark连接到Kafka以使用Kafka的推文。(我的下一篇文章将描述我们如何使用Apache Spark执行一些分析并使用我们的实时推文流构建预测模型!)
Apache Spark是一个大数据处理引擎和集群计算框架,能够批量和实时地执行大规模数据处理和分析,包括机器学习。Spark Streaming扩展了核心Spark API,允许我们在实时流数据流上重复使用相同的代码进行批处理,从而允许我们执行实时数据处理和分析。与往常一样,Apache Spark官方文档是开始熟悉Apache Spark的最佳场所。 - 弹性分布式数据集 - 弹性分布式数据集(RDR)是Spark中基本的内存数据结构 - 它们是一个不可变的分布式对象集合,分为跨集群中节点的逻辑分区。Spark隐藏了底层分区,因此可以暴露更高级别的API,开发人员可以使用Java,Scala或Python对RDD(如地图和过滤)执行典型操作,而无需担心(过多!)底层框架。RDD存储在内存中并且是不可变的 - RDD上的转换(例如地图)从现有数据集创建新数据集,并且操作返回值。所有变换都是懒惰的意味着结果不是立即计算的,仅在需要操作时计算。RDD也是可缓存的,这意味着数据可以持久保存到其他介质(例如磁盘),并且记录在Spark集群中进行分区和分布。RDD的设计部分是为了有效地运行迭代算法,例如机器学习和图算法,以及数据挖掘。 - Spark上下文 - Spark上下文是Spark应用程序的核心,允许它通过资源管理器(如Mesos,YARN或Apache Spark自己的集群管理器)访问Spark集群。 - 驱动程序 - Spark Driver是一个程序,它声明了对RDD数据进行的一系列转换和操作,并为Spark应用程序托管Spark上下文。驱动程序将Spark应用程序拆分为任务,然后由驱动程序调度和协调。 - Workers - Workers / Slaves是Spark实例,它执行线程池中驱动程序安排的任务。 - 执行程序 - 执行程序是实际执行任务并驻留在Workers中的代理程序,用于报告活动任务的心跳和指标。 在本文的其余部分,我们将专注于Spark Streaming API及其核心组件。 - 离散的流 - 除批量处理外,Spark还支持通过其高吞吐量,可扩展且容错的Streaming API(核心Spark API的扩展)处理实时数据流,例如我们的Twitter流。Discretized Streams,或DStream,代表连续的数据流,可以从输入数据流(如上面介绍的Flume和Kafka)创建。输入数据流被分成批处理(在内部,DStream是一系列连续的RDD),然后由Spark引擎处理(允许我们重用为批处理开发的代码以进行近实时处理)以生成最终的流结果也是分批的。 为了完成本文,我们将使用Spark的Streaming API将Apache Spark与Kafka集成,以使用来自Kafka的推文。显然,这并没有说明Spark的全部分析能力,但在我的下一篇文章中,我们将通过对Spark中的推文流进行分析来构建我们的实时数据管道!
首先,我们将使用Spark自己的集群管理器和我们的CentOS 7 Minimal Installation Server配置和部署一个非常基本的单节点Spark集群。
bash
# Unpack the Spark Binary to a directory of choice
tar -xzf spark-2.1.0-bin-hadoop2.7.tgz
# Basic Single-Node Spark Deployment Environment Configuration
# Note that Spark properties can be set directly using SparkConf passed to your SparkContext
# You can also specify relevant default configuration values using conf/spark-defaults.conf as follows
> vi spark-defaults.conf
# Spark Deployment Environment Properties
# In this example, we will be using Spark's own cluster manager
spark.master spark://<Cluster Manager Hostname>:7077
spark.driver.cores 1
spark.driver.maxResultSize 1g
spark.driver.memory 1g
spark.executor.memory 2g
spark.local.dir <Spark Scratch Directory>
# Networking Properties
spark.driver.host <Driver Hostname>
spark.network.timeout 120s
# Hadoop Properties
spark.hadoop.dfs.replication 1
# Start the Spark Master
sbin/start-master.sh
# Start a Spark Slave
sbin/start-slave.sh spark://<Cluster Manager Hostname>:7077
巴什复制您可以通过
bash
# Update the Flume Agent so that Spark can pull data from a Custom Sink using a Flume Receiver
cd $FLUME_HOME
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = Kafka
# Source Configuration - Inbuilt TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = Kafka
TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>
# Channel Configuration - Kafka Channel
TwitterAgent.channels.Kafka.type = org.apache.flume.channel.kafka.KafkaChannel
TwitterAgent.channels.Kafka.capacity = 10000
TwitterAgent.channels.Kafka.transactionCapacity = 100
TwitterAgent.channels.Kafka.brokerList = <Kafka Broker Hostname>:9092
TwitterAgent.channels.Kafka.topic = twitter
TwitterAgent.channels.Kafka.zookeeperConnect = <ZooKeeper Hostname>:2181
TwitterAgent.channels.Kafka.parseAsFlumeEvent = true
我们现在准备编写我们的Spark Streaming Application来读取Kafka的推文并在Spark中对它们执行一些简单的处理。如上所述,Spark公开了一个更高级别的API,以便开发人员可以用Java,Scala或Python编写他们的Spark应用程序。作为Java开发人员,我自然会选择Java或Scala!Spark Streaming应用程序
Spark Streaming支持两种从Kafka读取数据的方法: - 基于接收器的方法 - 工作器上的执行器内的接收器任务用于接收由Spark Streaming Context启动的作业处理的数据。但是,此方法不可靠,并且在节点故障的情况下可能导致数据丢失。 - 直接方法 - 在这种方法中,Spark定期向Kafka查询每个主题和分区中的最新偏移,这些偏移用于在从Kafka读取时定义偏移范围。这种方法提供了更好的可靠性和效率 我们将使用直接方法来消费来自Kafka的消息并在Spark中处理它们。由于我将用Java编写Spark Streaming Application,因此我将首先创建一个Apache Maven项目来处理所有构建依赖项。我将配置Maven来构建一个胖 JAR,这意味着我们将提交给Apache Spark的最终JAR不仅包括我们的Spark Streaming Application,还包括它需要运行的所有必需依赖项。我的POM文件中有几点需要注意: - Spark Streaming Kafka - 为了将我们的Spark Streaming应用程序与Kafka连接,需要依赖spark-streaming-kafka-0-82.11 - 提供的依赖关系 - Spark-core_2.11和spark-streaming_2.11已在Spark安装中提供,因此我们在POM中提供了这些标记 - Maven Assembly插件 - 我们将使用Maven Assembly Plugin构建我们的胖_ JAR,包括所有必需的依赖项 ```xml
<groupId>io.keisan.knowledgebase.spark.streaming</groupId>
<artifactId>keisan-spark-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<!-- Project Information -->
<name>Spark Kafka Direct Integration</name>
<description>Keisan Knowledgebase Spark Streaming Project - Kafka Direct Integration</description>
<url>https://www.keisan.io/knowledgebase</url>
<organization>
<name>Keisan Ltd</name>
<url>https://www.keisan.io</url>
</organization>
<developers>
<developer>
<id>jillur.quddus</id>
<name>Jillur Quddus</name>
<email>contactus@keisan.io</email>
<url>https://www.keisan.io</url>
<organization>Keisan Ltd</organization>
<organizationUrl>https://www.keisan.io</organizationUrl>
<roles>
<role>Lead Engineer</role>
<role>Data Scientist</role>
</roles>
<timezone>Europe/London</timezone>
</developer>
</developers>
<!-- Repositories -->
<repositories>
<!-- Confluent Repository for KafkaAvroDecoder -->
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<!-- Properties -->
<properties>
<apache.avro.version>1.8.1</apache.avro.version>
<apache.spark.core.2.11.version>2.1.0</apache.spark.core.2.11.version>
<apache.spark.streaming.2.11.version>2.1.0</apache.spark.streaming.2.11.version>
<apache.spark.streaming.kafka-0-8_2.11.version>2.1.0</apache.spark.streaming.kafka-0-8_2.11.version>
<confluent.kafka.avro.serializer.version>3.2.1</confluent.kafka.avro.serializer.version>
<jdk.version>1.8</jdk.version>
<maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
<maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
<output.directory>/keisan/knowledgebase/spark/jars</output.directory>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!-- Dependencies -->
<dependencies>
<!-- Apache Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache.avro.version}</version>
</dependency>
<!-- Apache Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${apache.spark.core.2.11.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${apache.spark.streaming.2.11.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${apache.spark.streaming.kafka-0-8_2.11.version}</version>
</dependency>
<!-- Confluent Kafka Avro Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.kafka.avro.serializer.version}</version>
</dependency>
</dependencies>
<!-- Build -->
<build>
<!-- Plugins -->
<plugins>
<!-- Maven Compiler: Compile the Sources of the Project -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugins.maven-compiler-plugin.version}</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
<!-- Maven Assembly: Aggregate project output with its dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven.plugins.maven-assembly-plugin.version}</version>
<configuration>
<!-- Final JAR Filename -->
<finalName>keisan-spark-kafka-${project.version}</finalName>
<!-- Include all Project Dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- JAR with dependencies Output Target Directory -->
<outputDirectory>${output.directory}</outputDirectory>
</configuration>
<executions>
<!-- Bind the assembly to the package phase -->
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
我们现在准备开始开发我们的Spark Streaming Application。要创建输入DStream,我们需要导入_KafkaUtils_并使用Spark Streaming Context创建直接流,Kafka Broker正在侦听的主机名和端口以及我们要从中使用消息的Kafka主题。<br />**简单的纯文本处理**<br />在下面的示例中,我们只是使用Kafka的推文,将值解码为Strings并将它们输出到Spark Executor Standard Out。
```java
package io.keisan.knowledgebase.spark.streaming.kafka;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
/**
* Example Spark Streaming Application with Kafka Direct Integration
*
* Periodically query Kafka for the latest offsets in each Topic and Partition.
* Consume the tweets using a String Decoder and display them in the console.
*
* Usage: StreamingKafkaDirectStringDecoder
* broker: The hostname and port at which the Kafka Broker is listening
*
* @author jillur.quddus
* @version 0.0.1
*
*/
public class StreamingKafkaDirectStringDecoder {
public static void main(String[] args) throws InterruptedException {
if ( args.length != 1 ) {
System.err.println("Usage: StreamingKafkaDirectStringDecoder <broker>");
System.exit(1);
}
// Create a Java Streaming Context with a Batch Interval of 5 seconds
SparkConf conf = new SparkConf().setAppName("Kafka Direct String Decoder");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
// Specify the Kafka Broker Options and set of Topics
String broker = args[0];
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list", broker);
Set<String> topics = Collections.singleton("twitter");
// Create an input DStream using KafkaUtils and simple plain-text message processing
JavaPairInputDStream<String, String> kafkaDirectStream = KafkaUtils.createDirectStream(jssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics);
kafkaDirectStream.foreachRDD(rdd -> {
rdd.foreach(record -> System.out.println(record._2));
});
// Start the computation
jssc.start();
// Wait for the computation to terminate
jssc.awaitTermination();
}
}
Avro处理
请注意,我们在数据管道中使用的Flume Twitter Source实际上将推文转换为Avro格式,并将这些Avro消息发送到下游。Avro是一个基于数据序列化模式和语言中立的系统,它使用JSON来声明数据结构和模式。您可以使用返回Bytes原始数组的DefaultDecoder,然后使用Avro的二进制解码器或Confluent的KafkaAvroDecoder解码它,以接收带有Avro记录的消息,而不是使用简单地将值解码为上面的Streaming应用程序中的字符串的StringDecoder。他们的价值观
运行Spark Streaming应用程序
我们现在准备运行我们的Spark Streaming Application!使用Maven构建胖 JAR并将其提交给Spark集群。因为出于本文的目的,我们仅使用单节点Spark集群进行开发和调试,所以在客户端模式下部署我们的应用程序是有意义的。
# Submit our Spark Streaming Application
cd $SPARK_HOME
bin/spark-submit --class io.keisan.knowledgebase.spark.streaming.kafka.StreamingKafkaDirectStringDecoder --deploy-mode client /keisan/knowledgebase/spark/jars/keisan-spark-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar <Kafka Broker Hostname:Port>
# Launch the Flume Agent using the Twitter Source and Kafka Channel
cd $FLUME_HOME
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf
如果您现在检查Spark Executor输出,您应该能够看到从近乎实时地从Kafka流向Spark的推文!
我们现在已经成功开发了一种近实时,高吞吐量,可靠且容错的数据流水线。Apache Flume用于收集发布到Apache Kafka的基于事件的数据(在我们的示例中为推文)。然后,Apache Spark用于使用来自Apache Kafka的数据并执行接近实时的数据处理。显然,简单地摄取和打印数据绝不会展示Apache Spark的功能!因此,在我的下一篇文章中,我将讨论如何在Spark中基于事件的数据流实时构建预测模型和实时数据分析。