如何使用Apache Spark中的实时机器学习管道构建预测模型?

    1-main-image.jpg


    在前一篇题为“使用Apache Kafka和Spark的实时数据管道”的文章中,我描述了如何构建高吞吐量,可扩展,可靠且容错的数据管道,能够获取基于事件的数据并最终将这些事件流式传输到我们处理它们的Apache Spark。我通过简单地使用Apache Spark来使用基于事件的数据并将它们打印到控制台来结束上一篇文章。在我上一篇题为“带有Avro Bijection的Apache Kafka Producer”的文章中,我更新了我们的数据管道,使用Apache Kafka Producer在Twitter4j包装器中使用Twitter的Hosebird客户端收集推文,其中推文使用Avro库和Bijection进行序列化。在本文中,我们将使用更新的基于事件的数据管道,使用Apache Spark实时构建预测模型。
    目标
    我们在本文中的目标是对有关航空公司的推文进行实时情绪分析 - 特别是考虑到英国航空公司最近的全球IT故障!我们将尝试自动将关于航空公司的推文分类为对该航空公司的负面或非负面因为我们的数据管道几乎实时接收它们。
    数据
    我们将用于构建我们的训练模型的数据集是来自CrowdFlower的 2015年2月美国航空公司的14,640条推文的集合。这些推文已经被分类,以确定他们的文字内容是否对所涉及的航空公司是积极的,消极的或中立的。可以使用以下链接下载数据集。
    基础
    让我们首先解释一些建模概念背后的基本理论,我将在本文后面介绍。我们知识库背后的首要目标是让尽可能多的人能够获得有关大数据和数据分析的知识。话虽如此,虽然本节旨在介绍一些更基本的概念,但很难在一页中详细解释数学理论。因此,如果这篇文章引起您的兴趣(我希望它会!),我建议您查看一些在CourseraedX等网站上提供的精彩在线课程,或者甚至注册更正式的课程。像开放大学这样的组织。
    预测模型
    建立预测模型以进行预测!监督学习是一种机器学习算法,它使用训练数据集根据已知结果训练模型。然后将该模型应用于之前未见过的新数据进行预测。这些预测可以采用分类预测的形式(例如,是/否,正/负/中性等) - 称为分类算法 - 或连续预测(例如价格,权重等) - 称为回归算法。示例监督学习算法包括线性回归,逻辑回归以及分类和回归树。在本文中,我将介绍后者 - 分类和回归树 - 并展示我们如何使用这些算法来训练模型,以根据我们上面下载的训练数据集对我们的推文进行分类。
    分类和回归树(CART)
    分类和回归树(CART)模型生成可以遍历以预测结果的。对于那些熟悉线性回归Logistic回归的人来说,CART不假设像线性回归这样的线性模型,与Logistic回归相比,它更容易理解和解释。以下来自维基百科的图片显示了一个简单的树,它可以帮助我们根据一些属性或属性来预测您是否可能作为泰坦尼克号的乘客生存,其中每个叶子下方的数字显示生存概率和百分比该叶子中的观察结果。 cart-example.png
    CART模型旨在将数据拆分为同类子集 - 您可以从CART模型中得出的基本预测因此只占每个子集中的大多数。根据您现有的CART模型,新的观察结果将属于一个子集,您可以只预测该子集中的现有多数。树有助于使CART模型具有可解释性 - 您可以从树的顶部开始向下移动,向左移动以获得积极响应,向右移动以获得否定响应。顾名思义,CART模型允许您预测连续结果或分类结果。在本文中,我们将使用CART模型来帮助我们将推文归类为对航空公司的否定或非负面即分类问题。
    随机森林
    随机森林旨在通过构建多个CART树(即森林)来进一步提高CART模型的准确性。使用随机森林的缺点是您的模型变得不易理解且解释性较差,但它们可能会提高预测的准确性。为了预测新观察,森林中的每棵树都投票在结果上你可以简单地选择获得最多选票的结果。为了构建随机森林,每个组成树被迫仅从一个自举的数据样本中拆分可用自变量的随机子集。例如,每个树的训练数据随机选择替换 - 因为每个树使用不同的自变量和不同的训练数据,我们生成不同树的森林。下概述了随机森林背后的基本原理。 random-forest-overview.jpg 分类矩阵
    分类(或混淆)矩阵可以帮助我们量化预测模型的准确性。通过将模型(列)中的预测值与实际值(行)进行比较来构建分类矩阵,如下所示: confusion-matrix.png 然后,Matrix中的每个单元格都会提供属于该类别的观察数量。TRUE Negatives和TRUE Positives是模型正确预测的观测数。FALSE Negatives和FALSE Positives是模型预测错误的观察数量。因此,这产生了许多可量化的度量,使我们能够说明模型与其他模型相比的准确度: - 灵敏度或真阳性率 = TP /(TP + FN)测量模型正确预测的阳性结果的百分比。 - 特异性或真阴性率 = TN /(TN + FP)测量模型正确预测的阴性结果的百分比。 - 总体准确度 =(TN + TP)/ N测量给定N个观测值的模型的总体准确度。 - 总误差率 =(FP + FN)/ N测量给定N个观测值的模型的总误差率。 自然语言处理(NLP)
    推文显然是文本性质的。这意味着它们不符合任何预定义的结构,并且可能包括拼写错误,俚语和其他形式的非传统语法。此外,每天都会产生数以亿计的推文,这使得人们无法进行大量的分析。自然语言处理(NLP)是使用计算机理解文本的领域 - 理解人类语言并从中获得意义。在我们将要构建的模型中,单词本身将是我们在CART模型中使用的变量,用于对我们的推文背后的情绪进行分类!
    常用术语
    以下是我的解释中经常出现的一些常用术语供您参考: - 文档 - 文档是单词的定义容器。文档可以是单个记录,文章或推文。 - 语料库 - 语料库是文档的集合。 - 术语频率 - 术语在文档中出现的次数。 - 反向文档频率 - 衡量一个术语在语料库中是常见的还是罕见的度量。例如,术语可以在给定的语料库出现无数次。仅使用术语频率,该术语可以被赋予与其频率成比例的权重,而其他更有意义的术语不一定被赋予更多权重。逆文档频率通过考虑更广泛的语料库来抵消术语的简单频率。 预处理数据
    NLP的基本特征之一是在构建预测模型之前预处理文本数据的想法,以尽可能地标准化来自不同源的文本。常见的预处理技术包括: - Common Casing - 将所有文本转换为小写或大写。 - 无意义的标点符号 - 基本方法是删除所有非字母数字字符。但要小心,因为一些非字母数字字符是有意义的,例如推文中的@和#。因此,应根据您尝试回答的问题来定义删除的非字母数字字符集。 - 停止词语 - 诸如Ithisthere等词语用于表达句子以表达其含义,但不一定有助于确定潜在的情绪。这些停用词通常从文本中删除,还有减少数据集大小的额外好处。 - 词干 - 这是将常用词减少到共同词干的过程。例如,词语连接连接结缔组织连接连接都可以被减少到其的共用杆连接而不去除潜在的意思。这并不是说Stemming没有错误 - 事实上,词干算法容易出错。然而,为了建立预测模型并进一步减小数据集的大小,这是一种有价值的技术。 - Lemmatisation - 虽然Stemming迅速将单词缩减为基本形式,但它没有考虑上下文,因此无法区分具有不同含义的单词,这取决于它在句子或上下文中的位置。Lemmatisation并没有粗略地简化基于共同词干的单词,而是旨在删除屈折结尾只是为了返回一个单词的词典形式,称为引理。例如,词语可以减小到引理虽然Stemmer无法推断出这种语境意义。虽然Lemmatisation可用于更好地保留背景和意义,但它会带来额外的复杂性和时间。 一袋字
    在我们预处理了数据之后,我们如何从仅仅文本中识别出用于我们的预测模型的自变量?好吧,一种方法叫做Bag of Words。在这种方法中,您只需计算每个单词在文本中出现的次数(即该单词的频率),从而为每个单词生成一个可用作基线的功能。然后,每个字频配对成为我们可以在模型中使用的独立变量。
    术语频率 - 逆文档频率(TF-IDF)
    术语频率 - 反向文档频率(TF-IDF)不仅仅是简单地计算每个单词的频率,而是旨在提供一个单词考虑单词在整个数据集中出现的频率的重要性。TF-IDF度量与单词在文档中出现的次数成比例增加,与整个数据集中单词的频率相抵消。
    特征向量
    通过将文本转换为一系列词频配对(如Bag of Words方法)或一系列单词重要性配对(与TF-IDF方法一样),我们实际上将文本转换为称为特征的数字向量向量。这很重要,因为正是这些特征向量被传递到机器学习算法以训练和测试监督预测模型。
    Apache Spark机器学习管道
    在我们开始构建预测模型之前,我想解释一下Apache Spark的机器学习库中的一些核心组件,它们允许我们构建预测模型。 - Transformer - Spark中的Transformer实际上将DataFrame作为其输入并生成新的DataFrame,通常是通过将新列附加到原始DataFrame作为读取和映射操作的结果。例如,在学习模型中,输入DataFrame可以包含包含特征向量的列。然后,变换器可以读取该列并预测每个特征向量的结果(标签),生成包含预测标签的新列。 - Estimator - Spark中的Estimator本质上是一种学习算法,可以训练数据生成模型 - 模型是变换器。例如,可以调用Spark Estimator LogisticRegression来训练训练数据集,生成LogisticRegressionModel,它是生成的Model和Transformer。 - 管道 - Spark中的管道是一系列有序的阶段,每个阶段都是变压器或估算器。 典型的Training Pipeline如下所示: spark-ml-pipeline.png 在上面的示例中,原始文本被拆分以生成每个文档的单词序列。使用某种术语频率算法从这些序列创建特征向量,然后将其传递给估计器以训练模型。
    生成的测试管道可能如下所示: spark-ml-pipeline-model.png 这里,再次拆分新的未见原始文本以生成每个文档的单词序列,并遵循相同的处理步骤。但这一次,上面生成的估计模型应用于新的特征向量以生成预测。通过使用这些管道和管道模型,我们可以构建训练模型,然后将相同的过程应用于我们的基于实时事件的数据流!
    Apache Maven
    现在我们已经介绍了一些基本概念,让我们开始编码!与前面的文章一样,我们将使用Apache Maven来处理构建依赖项。我的POM中需要注意的几件事情是: - ApacheSpark提供的依赖 -我已经被标记为Apache的Spark依赖性提供,因为它们包含在Apache的Spark实例。 - Stanford NLP - 我将使用Stanford NLP Library对文本进行词形还原。这个库是用Java编写的,因此可以在Java和Scala Spark应用程序中使用(我将在Scala中编写我们的机器学习管道)。 ```xml 4.0.0
    1. <groupId>io.keisan.knowledgebase.spark.mllib</groupId>
    2. <artifactId>keisan-spark-mllib</artifactId>
    3. <version>0.0.1-SNAPSHOT</version>
    4. <packaging>jar</packaging>
    5. <!-- Project Information -->
    6. <name>Example Spark Machine Learning Pipelines</name>
    7. <description>Keisan Knowledgebase Spark MLlib Project - Example Spark Machine Learning Pipelines</description>
    8. <url>https://www.keisan.io/knowledgebase/real-time-ml-pipeline-spark</url>
    9. <organization>
    10. <name>Keisan Ltd</name>
    11. <url>https://www.keisan.io</url>
    12. </organization>
    13. <developers>
    14. <developer>
    15. <id>jillur.quddus</id>
    16. <name>Jillur Quddus</name>
    17. <email>contactus@keisan.io</email>
    18. <url>https://www.keisan.io</url>
    19. <organization>Keisan Ltd</organization>
    20. <organizationUrl>https://www.keisan.io</organizationUrl>
    21. <roles>
    22. <role>Lead Engineer</role>
    23. <role>Data Scientist</role>
    24. </roles>
    25. <timezone>Europe/London</timezone>
    26. </developer>
    27. </developers>
    28. <!-- Properties -->
    29. <properties>
    30. <apache.spark.core.2.11.version>2.1.0</apache.spark.core.2.11.version>
    31. <apache.spark.mllib.2.11.version>2.1.0</apache.spark.mllib.2.11.version>
    32. <apache.spark.sql.2.11.version>2.1.0</apache.spark.sql.2.11.version>
    33. <jdk.version>1.8</jdk.version>
    34. <maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
    35. <maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
    36. <output.directory>/keisan/knowledgebase/spark/mllib/jars</output.directory>
    37. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    38. <scala.library.version>2.11.8</scala.library.version>
    39. <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
    40. <stanford.corenlp.version>3.7.0</stanford.corenlp.version>
    41. </properties>
    42. <!-- Dependencies -->
    43. <dependencies>
    44. <!-- Apache Spark -->
    45. <dependency>
    46. <groupId>org.apache.spark</groupId>
    47. <artifactId>spark-core_2.11</artifactId>
    48. <version>${apache.spark.core.2.11.version}</version>
    49. <scope>provided</scope>
    50. </dependency>
    51. <dependency>
    52. <groupId>org.apache.spark</groupId>
    53. <artifactId>spark-sql_2.11</artifactId>
    54. <version>${apache.spark.sql.2.11.version}</version>
    55. <scope>provided</scope>
    56. </dependency>
    57. <dependency>
    58. <groupId>org.apache.spark</groupId>
    59. <artifactId>spark-mllib_2.11</artifactId>
    60. <version>${apache.spark.mllib.2.11.version}</version>
    61. <scope>provided</scope>
    62. </dependency>
    63. <!-- Scala -->
    64. <dependency>
    65. <groupId>org.scala-lang</groupId>
    66. <artifactId>scala-library</artifactId>
    67. <version>${scala.library.version}</version>
    68. </dependency>
    69. <!-- Stanford NLP -->
    70. <dependency>
    71. <groupId>edu.stanford.nlp</groupId>
    72. <artifactId>stanford-corenlp</artifactId>
    73. <version>${stanford.corenlp.version}</version>
    74. </dependency>
    75. <dependency>
    76. <groupId>edu.stanford.nlp</groupId>
    77. <artifactId>stanford-corenlp</artifactId>
    78. <version>${stanford.corenlp.version}</version>
    79. <classifier>models</classifier>
    80. </dependency>
    81. </dependencies>
    82. <!-- Project Builder -->
    83. <build>
    84. <!-- Plugins -->
    85. <plugins>
    86. <!-- Maven Compiler: Compile the Sources of the Project -->
    87. <plugin>
    88. <groupId>org.apache.maven.plugins</groupId>
    89. <artifactId>maven-compiler-plugin</artifactId>
    90. <version>${maven.plugins.maven-compiler-plugin.version}</version>
    91. <configuration>
    92. <source>${jdk.version}</source>
    93. <target>${jdk.version}</target>
    94. </configuration>
    95. </plugin>
    96. <!-- Maven Assembly: Aggregate project output with its dependencies -->
    97. <plugin>
    98. <groupId>org.apache.maven.plugins</groupId>
    99. <artifactId>maven-assembly-plugin</artifactId>
    100. <version>${maven.plugins.maven-assembly-plugin.version}</version>
    101. <configuration>
    102. <!-- Final JAR Filename -->
    103. <finalName>keisan-spark-mllib-${project.version}</finalName>
    104. <appendAssemblyId>false</appendAssemblyId>
    105. <!-- Include all Project Dependencies -->
    106. <descriptorRefs>
    107. <descriptorRef>jar-with-dependencies</descriptorRef>
    108. </descriptorRefs>
    109. <!-- JAR with dependencies Output Target Directory -->
    110. <outputDirectory>${output.directory}</outputDirectory>
    111. </configuration>
    112. <executions>
    113. <!-- Bind the assembly to the package phase -->
    114. <execution>
    115. <id>make-assembly</id>
    116. <phase>package</phase>
    117. <goals>
    118. <goal>single</goal>
    119. </goals>
    120. </execution>
    121. </executions>
    122. </plugin>
    123. <!-- Scala Maven Plugin -->
    124. <plugin>
    125. <groupId>net.alchim31.maven</groupId>
    126. <artifactId>scala-maven-plugin</artifactId>
    127. <version>${scala.maven.plugin.version}</version>
    128. </plugin>
    129. </plugins>
    130. </build>

    1. **预处理管道**<br />正如我上面提到的,我将使用[Scala](https://www.scala-lang.org/)编写我的Spark Machine学习管道。Scala在JVM上运行,因此允许我们在Scala应用程序中使用用Java编写的库。此外,与Java相比,Scala在开发Spark应用程序时往往不那么冗长,而且对于新数据科学家来说更容易学习。<br />我要执行的第一个编码是在Scala中定义一系列函数,这些函数将允许我们预处理Spark Dataframes中的文本。如上所述,我将定义一系列函数,这些函数将允许我们小写文本,删除无意义的标点符号,删除停用词和lemmatize文本(使用Stanford Core NLP库)。
    2. ```scala
    3. package io.keisan.knowledgebase.spark.mllib;
    4. import java.util.Properties;
    5. import edu.stanford.nlp.ling.CoreAnnotations.LemmaAnnotation;
    6. import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation;
    7. import edu.stanford.nlp.ling.CoreAnnotations.TokensAnnotation;
    8. import edu.stanford.nlp.pipeline.Annotation;
    9. import edu.stanford.nlp.pipeline.StanfordCoreNLP;
    10. import edu.stanford.nlp.util.CoreMap;
    11. import org.apache.spark.sql.Dataset;
    12. import org.apache.spark.sql.Row;
    13. import org.apache.spark.sql.functions._;
    14. import org.apache.spark.sql.types._;
    15. import org.apache.spark.ml.feature.StopWordsRemover;
    16. import scala.collection.JavaConversions._;
    17. import scala.collection.mutable.ArrayBuffer;
    18. /**
    19. * Pre-Processor Utilities / Helper Functions
    20. * A collection of functions to pre-process text
    21. *
    22. * @author jillur.quddus
    23. * @version 0.0.1
    24. */
    25. object PreProcessorUtils {
    26. /**
    27. * Lowercase and Remove Punctuation
    28. * Lowercase and remove non-alphanumeric-space characters from the text field
    29. *
    30. * @param corpus The collection of documents as a Dataframe
    31. * @param textColumn The name of the column containing the text to be pre-processed
    32. * @return A Dataframe with the text lowercased and non-alphanumeric-space characters removed
    33. *
    34. */
    35. def lowercaseRemovePunctuation(corpus:Dataset[Row], textColumn:String): Dataset[Row] = {
    36. return corpus.withColumn(textColumn, regexp_replace(lower(corpus(textColumn)), "[^a-zA-Z0-9 ]", ""));
    37. }
    38. /**
    39. * Text Lemmatizer
    40. * Given a text string, generate a sequence of Lemmas
    41. *
    42. * @param text The text string to lemmatize
    43. * @param pipeline The Stanford Core NLP Pipeline
    44. * @return A sequence of lemmas
    45. *
    46. */
    47. def lemmatizeText(text: String, pipeline:StanfordCoreNLP): Seq[String] = {
    48. val doc = new Annotation(text);
    49. pipeline.annotate(doc);
    50. val lemmas = new ArrayBuffer[String]();
    51. val sentences = doc.get(classOf[SentencesAnnotation]);
    52. for (sentence <- sentences; token <- sentence.get(classOf[TokensAnnotation])) {
    53. val lemma = token.get(classOf[LemmaAnnotation])
    54. if (lemma.length > 2 && isOnlyLetters(lemma)) {
    55. lemmas += lemma.toLowerCase
    56. }
    57. }
    58. return lemmas;
    59. }
    60. /**
    61. * Check that a given String is made up entirely of alpha characters
    62. *
    63. * @param str The string to test for alpha characters
    64. * @return Boolean
    65. */
    66. def isOnlyLetters(str: String) = str.forall(c => Character.isLetter(c));
    67. /**
    68. * Stop Words Remover
    69. * Remove Stop Words from a given input column containing a sequence of String
    70. *
    71. * @param corpus The collection of documents as a Dataframe
    72. * @param inputColumn The name of the column containing a sequence of Strings to filter
    73. * @param outputColumn The name of the column to output the filtered sequence of Strings to
    74. */
    75. def stopWordRemover(corpus:Dataset[Row], inputColumn:String, outputColumn:String): Dataset[Row] = {
    76. val stopWordsRemover = new StopWordsRemover()
    77. .setInputCol(inputColumn)
    78. .setOutputCol(outputColumn);
    79. return stopWordsRemover.transform(corpus);
    80. }
    81. }

    摄取,标记和预处理数据
    我们现在可以提取我们从CrowdFlower下载的数据。我们已经下载的数据包含了标签栏(即)的结果,即airline_sentiment要么是正面,负面或中立的。我们可以保留标签,即多分类。但是,我们通过使用二进制分类标记数据(负面或非负面)来使其更简单一些。然后,我们的模型将能够预测推文是否只是对有问题的航空公司是负面的。要做到这一点,在我的代码下面我创建了一个新的String列,被称为negative_sentiment_label要么是真正的负情绪推特(我们取得积极成果)或在所有其他情况下都是假的(我们的负面结果,其中推文被分类为情绪的自然或积极)。在我们标记了数据之后,我们可以应用上面定义的预处理技术来删除非含义标点符号,将我们的文本解释并删除停用词。

    1. object TrainDecisionTreeClassifier {
    2. def main(args: Array[String]) = {
    3. /********************************************************************
    4. * SPARK CONTEXT
    5. ********************************************************************/
    6. // Create the Spark Context
    7. val conf = new SparkConf()
    8. .setAppName("Sentiment Models")
    9. .setMaster("spark://<Spark Master Hostname>:7077");
    10. val sc = new SparkContext(conf);
    11. val sparkSession = SparkSession.builder().getOrCreate();
    12. import sparkSession.implicits._;
    13. /********************************************************************
    14. * INGEST THE CORPUS
    15. ********************************************************************/
    16. // Define the CSV Dataset Schema
    17. val schema = new StructType(Array(
    18. StructField("unit_id", LongType, true),
    19. StructField("golden", BooleanType, true),
    20. StructField("unit_state", StringType, true),
    21. StructField("trusted_judgements", IntegerType, true),
    22. StructField("last_judgement_at", StringType, true),
    23. StructField("airline_sentiment", StringType, true),
    24. StructField("airline_sentiment_confidence", DoubleType, true),
    25. StructField("negative_reason", StringType, true),
    26. StructField("negative_reason_confidence", DoubleType, true),
    27. StructField("airline", StringType, true),
    28. StructField("airline_sentiment_gold", StringType, true),
    29. StructField("name", StringType, true),
    30. StructField("negative_reason_gold", StringType, true),
    31. StructField("retweet_count", IntegerType, true),
    32. StructField("text", StringType, true),
    33. StructField("tweet_coordinates", StringType, true),
    34. StructField("tweet_created", StringType, true),
    35. StructField("tweet_id", StringType, true),
    36. StructField("tweet_location", StringType, true),
    37. StructField("user_timezone", StringType, true)
    38. ));
    39. // Read the CSV Dataset, keeping only those columns that we need to build our model
    40. var tweetsDF = SparkSession.builder().getOrCreate().read
    41. .format("csv")
    42. .option("header", true)
    43. .option("delimiter", ",")
    44. .option("mode", "DROPMALFORMED")
    45. .schema(schema)
    46. .load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/raw/Airline-Sentiment-2-w-AA.csv")
    47. .select("airline_sentiment", "text");
    48. /********************************************************************
    49. * LABEL THE DATA
    50. ********************************************************************/
    51. // We are interested in detecting tweets with negative sentiment. Let us create a new column whereby
    52. // if the sentiment is negative, this new column is TRUE (Positive Outcome), and FALSE (Negative Outcome)
    53. // in all other cases
    54. tweetsDF = tweetsDF.withColumn("negative_sentiment_label", when(tweetsDF("airline_sentiment") === "negative", lit("true")).otherwise(lit("false")))
    55. .select("text", "negative_sentiment_label");
    56. /********************************************************************
    57. * APPLY THE PRE-PROCESSING PIPELINE
    58. ********************************************************************/
    59. // Let us now perform some simple pre-processing including converting the text column to lowercase
    60. // and removing all non-alphanumeric characters
    61. val lowercasedDF = PreProcessorUtils.lowercaseRemovePunctuation(tweetsDF, "text");
    62. // Lemmatize the text to generate a sequence of Lemmas using the Stanford NLP Library
    63. // By using mapPartitions, we create the Stanford NLP Pipeline once per partition rather than once per RDD entry
    64. val lemmatizedDF = lowercasedDF.select("text", "negative_sentiment_label").rdd.mapPartitions(p => {
    65. // Define the NLP Lemmatizer Pipeline once per partition
    66. val props = new Properties();
    67. props.put("annotators", "tokenize, ssplit, pos, lemma");
    68. val pipeline = new StanfordCoreNLP(props);
    69. // Lemmatize the text and preserve the Negative Sentiment Label
    70. p.map{
    71. case Row(text: String, negative_sentiment_label:String) => (PreProcessorUtils.lemmatizeText(text, pipeline), negative_sentiment_label);
    72. };
    73. }).toDF("lemmas", "negative_sentiment_label");
    74. // Remove Stop Words from the sequence of Lemmas
    75. val stopWordsRemovedDF = PreProcessorUtils.stopWordRemover(lemmatizedDF, "lemmas", "filtered_lemmas")
    76. .where(size(col("filtered_lemmas")) > 1);

    现在,如果你运行动作stopWordsRemovedDF.show(false),你应该看到一个类似于下面的DataFrame:

    preprocessed-training-data.png

    希望您应该看到3列 - 从原始文本派生的词条序列,删除了停用词的过滤的词条序列,以及我们的标签列,对于负面情绪推文是真的,对于所有其他情况都是的。
    特征向量和训练模型
    我们现在准备生成我们的训练模型。第一步是将我们过滤的引理序列转换为特征向量,以便我们可以将它们传递给Estimator以生成我们的训练模型。如上所述,我们将使用TF-IDF方法生成特征向量,其中考虑术语对语料库中文档的重要性,而不仅仅考虑术语频率(TF)。
    为了使事情更正式,我们可以说术语频率TF(t,d)是术语t出现在文档d中的次数。我们可以说文档频率DF(t,D)是我们的语料库D中包含术语t的文档数。然而,正如我上面所描述的那样,简单地使用TF可能过分强调经常出现但没有表达意义的术语。我们可以说反向文档频率IDF(t,D)是一个数字度量,通过考虑术语在语料库中出现的频率,术语的重要程度,可以计算如下:

    idf.png

    在这个等式中,| D | 是语料库D中的文档总数。并且由于使用了对数,如果一个术语出现在所有文档中,其IDF值将变为0.要计算最终的TF-IDF度量,您可以将TF乘以IDF,如下所示:

    tfidf.png

    在Spark的MLlib库中,Term Frequency TF向量可以使用Transformer HashingTF生成,它采用一系列术语并将它们转换为固定长度的特征向量。然后可以将这些特征向量传递给IDFModelIDFModel由适合数据集的IDF估计器生成,以根据跨语料库的频率对列进行缩放。
    一旦我们从引理序列生成了缩放的特征向量,我们就可以将它们传递给Estimators来构建我们的训练模型。在本文中,我们将基于决策树分类器构建训练模型估计。在下面的代码中,我定义了一个新的Scala对象来保存模型助手函数。第一个函数将从一系列(过滤的)引理中生成缩放的特征向量。第二个函数将使用这些缩放的特征向量和先前定义的标签来训练和输出决策树分类器。第三个函数将再次使用这些缩放的特征向量和先前定义的标签来训练和输出随机森林分类器。第四个功能将允许我们生成指标,例如我们的模型精度,从应用我们的训练模型到测试数据。

    1. package io.keisan.knowledgebase.spark.mllib;
    2. import org.apache.spark.sql.Dataset;
    3. import org.apache.spark.sql.functions._;
    4. import org.apache.spark.sql.types._;
    5. import org.apache.spark.sql.Row;
    6. import org.apache.spark.ml.Pipeline;
    7. import org.apache.spark.ml.PipelineModel;
    8. import org.apache.spark.ml.classification.DecisionTreeClassifier;
    9. import org.apache.spark.ml.classification.RandomForestClassifier;
    10. import org.apache.spark.ml.feature.IndexToString;
    11. import org.apache.spark.ml.feature.StringIndexer;
    12. import org.apache.spark.ml.feature.HashingTF;
    13. import org.apache.spark.ml.feature.IDF;
    14. import org.apache.spark.mllib.evaluation.MulticlassMetrics;
    15. import scala.collection.JavaConversions._;
    16. /**
    17. * Predictive Model Utilities / Helper Functions
    18. * A collection of functions to build Predictive Models
    19. *
    20. * @author jillur.quddus
    21. * @version 0.0.1
    22. */
    23. object ModelUtils {
    24. // Size of the fixed-length Feature Vectors
    25. val numFeatures = 4096;
    26. // Number of trees in our random forests
    27. val numTrees = 256;
    28. /**
    29. * Term Frequency-Inverse Document Frequency (TF-IDF)
    30. * Generate Term Frequency Feature Vectors by passing the sequence of lemmas to the HashingTF Transformer.
    31. * Fit the IDF Estimator to the Featurized Dataset to generate the IDFModel.
    32. * Pass the TF Feature Vectors to the IDFModel to scale based on frequency across the corpus
    33. *
    34. * @param corpus Dataset containing the sequence of lemmas
    35. * @param inputColumn The name of the column containing the sequence of (filtered) lemmas
    36. * @param outputColumn The name of the column to store the Scaled Feature Vectors
    37. * @return A DataFrame with the Scaled Feature Vectors
    38. *
    39. */
    40. def tfidf(corpus:Dataset[Row], inputColumn:String, outputColumn:String): Dataset[Row] = {
    41. // Convert the sequence of Lemmas into fixed-length feature vectors using the HashingTF Transformer
    42. val hashingTF = new HashingTF()
    43. .setInputCol(inputColumn)
    44. .setOutputCol("raw_features")
    45. .setNumFeatures(numFeatures);
    46. val featurizedData = hashingTF.transform(corpus);
    47. // Takes the feature vectors and scale each column based on how frequently it appears in the corpus
    48. val idf = new IDF().setInputCol("raw_features").setOutputCol(outputColumn);
    49. val idfModel = idf.fit(featurizedData);
    50. return idfModel.transform(featurizedData);
    51. }
    52. /**
    53. * Build a Decision Tree Classifier
    54. * Train a Decision Tree Model by supplying the training dataset that includes the label and feature vector columns
    55. *
    56. * @param featuresDF The full DataFrame containing the labels and feature vectors
    57. * @param trainingDF The training split DataFrame to be used to train the Model
    58. * @param labelColumn The name of the column containing the labels
    59. * @param featuresColumn The name of the column containing the scaled feature vectors
    60. * @return The PipelineModel containing our trained decision tree model
    61. *
    62. */
    63. def trainDecisionTreeModel(featurizedDF:Dataset[Row], trainingDF:Dataset[Row], labelColumn:String,
    64. featuresColumn:String): PipelineModel = {
    65. // Index the Labels
    66. val labelIndexer = new StringIndexer()
    67. .setInputCol(labelColumn)
    68. .setOutputCol("indexed_label")
    69. .fit(featurizedDF);
    70. // Define the Decision Tree Model
    71. val decisionTreeModel = new DecisionTreeClassifier()
    72. .setLabelCol("indexed_label")
    73. .setFeaturesCol(featuresColumn);
    74. // Convert the Indexed Labels back to the original Labels based on the trained predictions
    75. val labelConverter = new IndexToString()
    76. .setInputCol("prediction")
    77. .setOutputCol("predicted_label")
    78. .setLabels(labelIndexer.labels);
    79. // Chain the Indexers and Decision Tree Model to form a Pipeline
    80. val pipeline = new Pipeline()
    81. .setStages(Array(labelIndexer, decisionTreeModel, labelConverter));
    82. // Run the Indexers and Train the Model on the Training Data
    83. return pipeline.fit(trainingDF);
    84. }
    85. /**
    86. * Build a Random Forest Classifier
    87. * Train a Random Forest Model by supplying the training dataset that includes the label and feature vector columns
    88. *
    89. * @param featuresDF The full DataFrame containing the labels and feature vectors
    90. * @param trainingDF The training split DataFrame to be used to train the Model
    91. * @param labelColumn The name of the column containing the labels
    92. * @param featuresColumn The name of the column containing the scaled feature vectors
    93. * @return The PipelineModel containing our trained random forest model
    94. *
    95. */
    96. def trainRandomForestModel(featurizedDF:Dataset[Row], trainingDF:Dataset[Row], labelColumn:String,
    97. featuresColumn:String): PipelineModel = {
    98. // Index the Labels
    99. val labelIndexer = new StringIndexer()
    100. .setInputCol(labelColumn)
    101. .setOutputCol("indexed_label")
    102. .fit(featurizedDF);
    103. // Define a Random Forest model
    104. val randomForestModel = new RandomForestClassifier()
    105. .setLabelCol("indexed_label")
    106. .setFeaturesCol(featuresColumn)
    107. .setNumTrees(numTrees);
    108. // Convert the Indexed Labels back to the original Labels based on the trained predictions
    109. val labelConverter = new IndexToString()
    110. .setInputCol("prediction")
    111. .setOutputCol("predicted_label")
    112. .setLabels(labelIndexer.labels);
    113. // Chain the Indexers and Random Forest Model to form a Pipeline
    114. val pipeline = new Pipeline()
    115. .setStages(Array(labelIndexer, randomForestModel, labelConverter));
    116. // Run the Indexers and Train the Model on the Training Data
    117. return pipeline.fit(trainingDF);
    118. }
    119. /**
    120. * Generate Multi-class Metrics
    121. * Generate multi-class metrics given a predictions dataframe containing prediction and indexed label double columns.
    122. * Such metrics allow us to generate classification matrices, false and true positive rates etc.
    123. *
    124. * @param predictionsDF A DataFrame containing predictions and indexed labels
    125. * @param predictionColumn The name of the column containing the predictions [Double]
    126. * @param indexedLabelColumn The name of the column containing the indexed labels [Double]
    127. * @return A MulticlassMetrics object that can be used to output model metrics
    128. *
    129. */
    130. def generateMulticlassMetrics(predictionsDF:Dataset[Row], predictionColumn:String, indexedLabelColumn:String): MulticlassMetrics = {
    131. val predictionAndLabels = predictionsDF.select(predictionColumn, indexedLabelColumn).rdd.map{
    132. case Row(predictionColumn: Double, indexedLabelColumn:Double) => (predictionColumn, indexedLabelColumn);
    133. };
    134. return new MulticlassMetrics(predictionAndLabels);
    135. }
    136. }

    训练决策树分类器
    现在让我们返回主应用程序并通过执行以下操作完成它:

    • 从我们的滤波Lemmas序列生成缩放的特征向量
    • 随机将我们的语料库分成训练(70%)和测试(30%)数据集
    • 在训练数据集上训练决策树分类器估计器
    • 将生成的决策树分类器模型应用于测试数据集,允许我们生成有关模型准确性的指标
    • 将我们的决策树分类器模型保存到HDFS,以便稍后在我们的实时基于事件的流数据管道中使用
    1. package io.keisan.knowledgebase.spark.mllib;
    2. import edu.stanford.nlp.pipeline.StanfordCoreNLP;
    3. import java.util.Properties;
    4. import org.apache.spark.SparkConf;
    5. import org.apache.spark.SparkContext;
    6. import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
    7. import org.apache.spark.sql.Dataset;
    8. import org.apache.spark.sql.functions._;
    9. import org.apache.spark.sql.types._;
    10. import org.apache.spark.sql.Row;
    11. import org.apache.spark.sql.SparkSession;
    12. import org.apache.spark.sql.SparkSession.Builder;
    13. import scala.collection.JavaConversions._;
    14. /**
    15. * Tweet Sentiment Decision Tree Classifier
    16. * Train a Decision Tree Classifier on a collection of pre-labelled tweets about airlines
    17. *
    18. * @author jillur.quddus
    19. * @version 0.0.1
    20. */
    21. object TrainDecisionTreeClassifier {
    22. def main(args: Array[String]) = {
    23. /********************************************************************
    24. * SPARK CONTEXT
    25. ********************************************************************/
    26. // Create the Spark Context
    27. val conf = new SparkConf()
    28. .setAppName("Sentiment Models")
    29. .setMaster("spark://<Spark Master Hostname>:7077");
    30. val sc = new SparkContext(conf);
    31. val sparkSession = SparkSession.builder().getOrCreate();
    32. import sparkSession.implicits._;
    33. /********************************************************************
    34. * INGEST THE CORPUS
    35. ********************************************************************/
    36. // Define the CSV Dataset Schema
    37. val schema = new StructType(Array(
    38. StructField("unit_id", LongType, true),
    39. StructField("golden", BooleanType, true),
    40. StructField("unit_state", StringType, true),
    41. StructField("trusted_judgements", IntegerType, true),
    42. StructField("last_judgement_at", StringType, true),
    43. StructField("airline_sentiment", StringType, true),
    44. StructField("airline_sentiment_confidence", DoubleType, true),
    45. StructField("negative_reason", StringType, true),
    46. StructField("negative_reason_confidence", DoubleType, true),
    47. StructField("airline", StringType, true),
    48. StructField("airline_sentiment_gold", StringType, true),
    49. StructField("name", StringType, true),
    50. StructField("negative_reason_gold", StringType, true),
    51. StructField("retweet_count", IntegerType, true),
    52. StructField("text", StringType, true),
    53. StructField("tweet_coordinates", StringType, true),
    54. StructField("tweet_created", StringType, true),
    55. StructField("tweet_id", StringType, true),
    56. StructField("tweet_location", StringType, true),
    57. StructField("user_timezone", StringType, true)
    58. ));
    59. // Read the CSV Dataset, keeping only those columns that we need to build our model
    60. var tweetsDF = SparkSession.builder().getOrCreate().read
    61. .format("csv")
    62. .option("header", true)
    63. .option("delimiter", ",")
    64. .option("mode", "DROPMALFORMED")
    65. .schema(schema)
    66. .load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/raw/Airline-Sentiment-2-w-AA.csv")
    67. .select("airline_sentiment", "text");
    68. /********************************************************************
    69. * LABEL THE DATA
    70. ********************************************************************/
    71. // We are interested in detecting tweets with negative sentiment. Let us create a new column whereby
    72. // if the sentiment is negative, this new column is TRUE (Positive Outcome), and FALSE (Negative Outcome)
    73. // in all other cases
    74. tweetsDF = tweetsDF.withColumn("negative_sentiment_label", when(tweetsDF("airline_sentiment") === "negative", lit("true")).otherwise(lit("false")))
    75. .select("text", "negative_sentiment_label");
    76. /********************************************************************
    77. * APPLY THE PRE-PROCESSING PIPELINE
    78. ********************************************************************/
    79. // Let us now perform some simple pre-processing including converting the text column to lowercase
    80. // and removing all non-alphanumeric characters
    81. val lowercasedDF = PreProcessorUtils.lowercaseRemovePunctuation(tweetsDF, "text");
    82. // Lemmatize the text to generate a sequence of Lemmas using the Stanford NLP Library
    83. // By using mapPartitions, we create the Stanford NLP Pipeline once per partition rather than once per RDD entry
    84. val lemmatizedDF = lowercasedDF.select("text", "negative_sentiment_label").rdd.mapPartitions(p => {
    85. // Define the NLP Lemmatizer Pipeline once per partition
    86. val props = new Properties();
    87. props.put("annotators", "tokenize, ssplit, pos, lemma");
    88. val pipeline = new StanfordCoreNLP(props);
    89. // Lemmatize the text and preserve the Negative Sentiment Label
    90. p.map{
    91. case Row(text: String, negative_sentiment_label:String) => (PreProcessorUtils.lemmatizeText(text, pipeline), negative_sentiment_label);
    92. };
    93. }).toDF("lemmas", "negative_sentiment_label");
    94. // Remove Stop Words from the sequence of Lemmas
    95. val stopWordsRemovedDF = PreProcessorUtils.stopWordRemover(lemmatizedDF, "lemmas", "filtered_lemmas")
    96. .where(size(col("filtered_lemmas")) > 1);
    97. /********************************************************************
    98. * SCALED FEATURE VECTOR
    99. ********************************************************************/
    100. // Generate the Scaled Feature Vectors
    101. val featurizedDF = ModelUtils.tfidf(stopWordsRemovedDF, "filtered_lemmas", "features");
    102. /********************************************************************
    103. * TRAIN AND EVALUATE A DECISION TREE CLASSIFIER
    104. ********************************************************************/
    105. // Split the data into Training and Test Datasets
    106. val Array(trainingDF, testDF) = featurizedDF.randomSplit(Array(0.7, 0.3))
    107. // Train a Decision Tree Model using the Training Dataset
    108. val decisionTreeModel = ModelUtils.trainDecisionTreeModel(featurizedDF, trainingDF, "negative_sentiment_label", "features");
    109. // Apply the Decision Tree Training Model to the Test Dataset
    110. val decisionTreePredictions = decisionTreeModel.transform(testDF);
    111. decisionTreePredictions.select("negative_sentiment_label", "predicted_label", "filtered_lemmas", "features").show(false);
    112. // Compute the accuracy of the Decision Tree Training Model on the Test Dataset
    113. val decisionTreeEvaluator = new MulticlassClassificationEvaluator()
    114. .setLabelCol("indexed_label")
    115. .setPredictionCol("prediction")
    116. .setMetricName("accuracy");
    117. val decisionTreeAccuracy = decisionTreeEvaluator.evaluate(decisionTreePredictions);
    118. println("Decision Tree Test Accuracy Rate = " + decisionTreeAccuracy);
    119. println("Decision Tree Test Error Rate = " + (1.0 - decisionTreeAccuracy));
    120. // Generate a Classification Matrix
    121. val metrics = ModelUtils.generateMulticlassMetrics(decisionTreePredictions, "prediction", "indexed_label");
    122. println(metrics.confusionMatrix);
    123. // Generate Label Accuracy Metrics
    124. val labelMetrics = metrics.labels;
    125. labelMetrics.foreach { l =>
    126. println(s"False Positive Rate ($l) = " + metrics.falsePositiveRate(l));
    127. }
    128. /********************************************************************
    129. * SAVE THE DECISION TREE CLASSIFIER FOR REAL-TIME STREAMING
    130. ********************************************************************/
    131. decisionTreeModel.save("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/models/decisionTreeClassifier");
    132. }
    133. }

    多级度量标准
    上面的应用程序将打印一些关于我们训练的决策树分类器模型在测试数据集上的准确性的指标。确切的数字可能与您的略有不同,因为我们随机分割语料库,但通常您应该看到以下结果:

    • 测试准确率:71%
    • 测试错误率:29%,

    和以下分类矩阵

    2285.0 293.0
    945.0 681.0

    该分类矩阵告诉我们,在4,204次测试观察中,经过训练的决策树分类器模型得到2,285个预测正确,从而预测负面情绪,测试推文实际上是负面情绪(真实肯定)。该模型得到了进一步的681个预测正确,从而预测了积极的情绪,测试推文实际上是积极的情绪(真阴性)。因此,在总共4,204次测试观察中,该模型得到2,966个预测正确 - 准确率为71%。
    然而,通过遵循分类矩阵中的相反对角线,模型得到293个预测不正确,从而预测正面情绪,但测试推文实际上是负面情绪(假阴性)。该模型得到了进一步的945预测不正确,因此它预测负面情绪,但测试推文实际上是积极的情绪(误报)。因此,在总共4,204次测试观察中,该模型得出1,238个预测错误 - 错误率为29%。
    因此,我们的决策树分类器在71%的时间内正确地预测了推文的情绪。我们现在将我们训练有素的模型应用于我们基于事件的实时流媒体数据管道,以尝试预测航空公司推文创建时的情绪!
    Apache Kafka 生产者
    在我的上一篇文章中,我们更新了我们的数据管道,使用Apache Kafka Producer在Twitter4j包装器中使用Twitter的Hosebird客户端收集推文,其中推文使用Avro库和Bijection进行序列化。如果您运行该Producer,您应该观察发布到Kafka主题的推文。
    Apache Spark消费者流应用程序
    我们现在要创建一个Apache Spark Consumer Streaming Application,它将使用来自Kafka主题的Avro推文。然后,我们将扩展我们的高级实时数据处理流程,方法是将这些推文传递给上述相同的预处理管道,然后再对其应用我们经过训练的决策树分类器,尝试近乎实时地对其潜在情绪进行分类。 !

    image.jpeg

    我将在我们上面训练决策树分类器的同一个项目中编写Spark Consumer Streaming Application。或者,您可以为Consumer Streaming Application创建一个新项目 - 这取决于您。由于我将使用相同的项目,我需要更新我的Maven POM以包含一些额外的依赖项 - 即Apache Avro,Apache Spark Streaming(已提供),Apache Kafka(请注意,我将使用与之兼容的0.8.2.2版本)我从上一篇文章中得到了 Spark 2.1.0 Streaming,Bijection和我的Kafka Twitter Producer应用程序(在这里我定义了Avro Tweet Schema,当我从Kafka主题消费并解析消息时我将再次需要它):

    1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    3. <modelVersion>4.0.0</modelVersion>
    4. <!-- Project Information -->
    5. <groupId>io.keisan.knowledgebase.spark.mllib</groupId>
    6. <artifactId>keisan-spark-mllib</artifactId>
    7. <version>0.0.1-SNAPSHOT</version>
    8. <packaging>jar</packaging>
    9. <!-- Project Information -->
    10. <name>Example Spark Predictive Models</name>
    11. <description>Keisan Knowledgebase Spark MLlib Project - Example Spark Machine Learning Pipelines</description>
    12. <url>https://www.keisan.io/knowledgebase/real-time-ml-pipeline-spark</url>
    13. <organization>
    14. <name>Keisan Ltd</name>
    15. <url>https://www.keisan.io</url>
    16. </organization>
    17. <developers>
    18. <developer>
    19. <id>jillur.quddus</id>
    20. <name>Jillur Quddus</name>
    21. <email>contactus@keisan.io</email>
    22. <url>https://www.keisan.io</url>
    23. <organization>Keisan Ltd</organization>
    24. <organizationUrl>https://www.keisan.io</organizationUrl>
    25. <roles>
    26. <role>Lead Engineer</role>
    27. <role>Data Scientist</role>
    28. </roles>
    29. <timezone>Europe/London</timezone>
    30. </developer>
    31. </developers>
    32. <!-- Properties -->
    33. <properties>
    34. <apache.avro.version>1.8.1</apache.avro.version>
    35. <apache.kafka.2.11.version>0.8.2.2</apache.kafka.2.11.version>
    36. <apache.spark.core.2.11.version>2.1.0</apache.spark.core.2.11.version>
    37. <apache.spark.mllib.2.11.version>2.1.0</apache.spark.mllib.2.11.version>
    38. <apache.spark.sql.2.11.version>2.1.0</apache.spark.sql.2.11.version>
    39. <apache.spark.streaming.2.11.version>2.1.0</apache.spark.streaming.2.11.version>
    40. <apache.spark.streaming.kafka-0-8_2.11.version>2.1.0</apache.spark.streaming.kafka-0-8_2.11.version>
    41. <bijection.avro.2.11.version>0.9.5</bijection.avro.2.11.version>
    42. <jdk.version>1.8</jdk.version>
    43. <keisan.kafka.producers.version>0.0.1-SNAPSHOT</keisan.kafka.producers.version>
    44. <maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
    45. <maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
    46. <output.directory>/keisan/knowledgebase/spark/mllib/jars</output.directory>
    47. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    48. <scala.library.version>2.11.8</scala.library.version>
    49. <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
    50. <stanford.corenlp.version>3.7.0</stanford.corenlp.version>
    51. </properties>
    52. <!-- Dependencies -->
    53. <dependencies>
    54. <!-- Apache Avro -->
    55. <dependency>
    56. <groupId>org.apache.avro</groupId>
    57. <artifactId>avro</artifactId>
    58. <version>${apache.avro.version}</version>
    59. </dependency>
    60. <!-- Apache Kafka -->
    61. <dependency>
    62. <groupId>org.apache.kafka</groupId>
    63. <artifactId>kafka_2.11</artifactId>
    64. <version>${apache.kafka.2.11.version}</version>
    65. </dependency>
    66. <!-- Apache Spark -->
    67. <dependency>
    68. <groupId>org.apache.spark</groupId>
    69. <artifactId>spark-core_2.11</artifactId>
    70. <version>${apache.spark.core.2.11.version}</version>
    71. <scope>provided</scope>
    72. </dependency>
    73. <dependency>
    74. <groupId>org.apache.spark</groupId>
    75. <artifactId>spark-mllib_2.11</artifactId>
    76. <version>${apache.spark.mllib.2.11.version}</version>
    77. <scope>provided</scope>
    78. </dependency>
    79. <dependency>
    80. <groupId>org.apache.spark</groupId>
    81. <artifactId>spark-sql_2.11</artifactId>
    82. <version>${apache.spark.sql.2.11.version}</version>
    83. <scope>provided</scope>
    84. </dependency>
    85. <dependency>
    86. <groupId>org.apache.spark</groupId>
    87. <artifactId>spark-streaming_2.11</artifactId>
    88. <version>${apache.spark.streaming.2.11.version}</version>
    89. <scope>provided</scope>
    90. </dependency>
    91. <dependency>
    92. <groupId>org.apache.spark</groupId>
    93. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    94. <version>${apache.spark.streaming.kafka-0-8_2.11.version}</version>
    95. </dependency>
    96. <!-- Bijection -->
    97. <dependency>
    98. <groupId>com.twitter</groupId>
    99. <artifactId>bijection-avro_2.11</artifactId>
    100. <version>${bijection.avro.2.11.version}</version>
    101. </dependency>
    102. <!-- Keisan Kafka Producers -->
    103. <dependency>
    104. <groupId>io.keisan.knowledgebase.kafka.producers</groupId>
    105. <artifactId>keisan-kafka-producers</artifactId>
    106. <version>${keisan.kafka.producers.version}</version>
    107. </dependency>
    108. <!-- Scala -->
    109. <dependency>
    110. <groupId>org.scala-lang</groupId>
    111. <artifactId>scala-library</artifactId>
    112. <version>${scala.library.version}</version>
    113. </dependency>
    114. <!-- Stanford NLP -->
    115. <dependency>
    116. <groupId>edu.stanford.nlp</groupId>
    117. <artifactId>stanford-corenlp</artifactId>
    118. <version>${stanford.corenlp.version}</version>
    119. </dependency>
    120. <dependency>
    121. <groupId>edu.stanford.nlp</groupId>
    122. <artifactId>stanford-corenlp</artifactId>
    123. <version>${stanford.corenlp.version}</version>
    124. <classifier>models</classifier>
    125. </dependency>
    126. </dependencies>
    127. <!-- Project Builder -->
    128. <build>
    129. <!-- Plugins -->
    130. <plugins>
    131. <!-- Maven Compiler: Compile the Sources of the Project -->
    132. <plugin>
    133. <groupId>org.apache.maven.plugins</groupId>
    134. <artifactId>maven-compiler-plugin</artifactId>
    135. <version>${maven.plugins.maven-compiler-plugin.version}</version>
    136. <configuration>
    137. <source>${jdk.version}</source>
    138. <target>${jdk.version}</target>
    139. </configuration>
    140. </plugin>
    141. <!-- Maven Assembly: Aggregate project output with its dependencies -->
    142. <plugin>
    143. <groupId>org.apache.maven.plugins</groupId>
    144. <artifactId>maven-assembly-plugin</artifactId>
    145. <version>${maven.plugins.maven-assembly-plugin.version}</version>
    146. <configuration>
    147. <!-- Final JAR Filename -->
    148. <finalName>keisan-spark-mllib-${project.version}</finalName>
    149. <appendAssemblyId>false</appendAssemblyId>
    150. <!-- Include all Project Dependencies -->
    151. <descriptorRefs>
    152. <descriptorRef>jar-with-dependencies</descriptorRef>
    153. </descriptorRefs>
    154. <!-- JAR with dependencies Output Target Directory -->
    155. <outputDirectory>${output.directory}</outputDirectory>
    156. </configuration>
    157. <executions>
    158. <!-- Bind the assembly to the package phase -->
    159. <execution>
    160. <id>make-assembly</id>
    161. <phase>package</phase>
    162. <goals>
    163. <goal>single</goal>
    164. </goals>
    165. </execution>
    166. </executions>
    167. </plugin>
    168. <!-- Scala Maven Plugin -->
    169. <plugin>
    170. <groupId>net.alchim31.maven</groupId>
    171. <artifactId>scala-maven-plugin</artifactId>
    172. <version>${scala.maven.plugin.version}</version>
    173. </plugin>
    174. </plugins>
    175. </build>
    176. </project>

    Spark Streaming Context
    我将在Scala中编写Spark消费者流应用程序,但Java等效项应该在结构上类似,尽管更冗长。首先要做的是创建Spark Streaming Context。我将把正常的Spark Context传递给Spark Streaming Context Constructors之一来创建Streaming Context,在我的例子中使用5秒的批处理间隔。

    1. // Spark Streaming Context
    2. val conf = new SparkConf()
    3. .setAppName("Streaming Twitter Sentiment Classifier")
    4. .setMaster("spark://<Spark Master Hostname>:7077");
    5. val sc = new SparkContext(conf);
    6. val ssc = new StreamingContext(sc, Seconds(5));
    7. val sparkSession = SparkSession.builder().getOrCreate();
    8. import sparkSession.implicits._;

    加载管道模型
    第二步是将我们先前持久化的训练决策树分类器流水线模型加载到HDFS。要做到这一点很简单:

    1. // Load the Trained Decision Tree Classifier
    2. val decisionTreeModel = PipelineModel.read.load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/models/decisionTreeClassifier");

    实时流媒体情感分析
    我们现在准备编写我们的主要Consumer Streaming应用程序。我们的Spark Streaming应用程序将执行以下操作:

    • Kafka直接流 - 我们将使用默认解码器定期向Kafka查询每个主题和分区中的最新偏移量,以返回表示我们的Avro推文的原始字节数组,从而产生离散化的流。
    • Bijection - 然后我们将通过使用Bijection和我的Kafka Twitter Producer中定义的Tweet Schema反转原始字节数组来开始处理离散化的流,从而产生一个RDD of Rows,然后我们将转换为包含我们解析的tweet字段的DataFrame,包括推文的核心文本。
    • 预处理 - 接下来,我们将对我们在训练决策树分类器时应用的这些DataFrame应用相同的预处理管道,即小写文本,删除无意义的标点符号,使用斯坦福的Core NLP库对文本进行词典化,最后删除从由此产生的引理序列中停止。
    • 缩放特征向量 - 接下来,我们将使用TF-IDF从这些过滤的词条序列生成缩放特征向量。
    • 决策树分类器预测 - 最后,我们将这些缩放的特征向量传递给我们之前创建和加载的训练过的决策树分类器,以预测推文中的推文是负面还是非负面!
    1. package io.keisan.knowledgebase.spark.mllib;
    2. import com.twitter.bijection.Injection;
    3. import com.twitter.bijection.avro.GenericAvroCodecs;
    4. import edu.stanford.nlp.pipeline.StanfordCoreNLP;
    5. import io.keisan.knowledgebase.kafka.producers.TwitterProducer;
    6. import java.util.HashMap;
    7. import java.util.Properties;
    8. import kafka.serializer.DefaultDecoder;
    9. import kafka.serializer.StringDecoder;
    10. import org.apache.avro.Schema;
    11. import org.apache.avro.generic.GenericRecord;
    12. import org.apache.spark.ml.PipelineModel;
    13. import org.apache.spark.rdd.RDD;
    14. import org.apache.spark.SparkConf;
    15. import org.apache.spark.SparkContext;
    16. import org.apache.spark.sql.Dataset;
    17. import org.apache.spark.sql.Row;
    18. import org.apache.spark.sql.SparkSession;
    19. import org.apache.spark.sql.functions._;
    20. import org.apache.spark.sql.types._;
    21. import org.apache.spark.streaming.Seconds;
    22. import org.apache.spark.streaming.StreamingContext;
    23. import org.apache.spark.streaming.kafka.KafkaUtils;
    24. import scala.collection.JavaConversions._;
    25. /**
    26. * Streaming Twitter Sentiment Classifier
    27. * Consume and deserialise Avro Tweets from Kafka Topics. Apply the same pre-processing
    28. * pipeline to the stream of tweets as our Decision Tree Classifier trainer and generate
    29. * scaled Feature Vectors. Finally apply the trained Decision Tree Classifier on the
    30. * stream of tweets to predict their sentiment in near real-time.
    31. *
    32. * Usage: KafkaStreamingSentimentClassifier <brokers> <topics>
    33. * brokers: List of one or more Kafka Brokers
    34. * topics: List of one or more Kafka Topics to consume from
    35. *
    36. * @author jillur.quddus
    37. * @version 0.0.1
    38. */
    39. object KafkaStreamingSentimentClassifier {
    40. // Spark Streaming Context
    41. val conf = new SparkConf()
    42. .setAppName("Streaming Twitter Sentiment Classifier")
    43. .setMaster("spark://<Spark Master Hostname>:7077");
    44. val sc = new SparkContext(conf);
    45. val ssc = new StreamingContext(sc, Seconds(5));
    46. val sparkSession = SparkSession.builder().getOrCreate();
    47. import sparkSession.implicits._;
    48. // Load the Trained Decision Tree Classifier
    49. val decisionTreeModel = PipelineModel.read.load("hdfs://<Namenode Hostname>:9000/keisan/knowledgebase/spark/mllib/models/decisionTreeClassifier");
    50. /**
    51. * Consume and deserialise Avro messages from the Kafka Topics using a Direct Stream Approach.
    52. * Pass the stream of tweets through the same pre-processing pipeline as the training dataset
    53. * and generate scaled Feature Vectors. Pass the scaled Feature Vectors to the trained
    54. * Decision Tree Classifier to predict the underlying sentiment.
    55. *
    56. * @param args List of Kafka Brokers and Topics from which to consume from
    57. */
    58. def main(args: Array[String]) = {
    59. if ( args.length < 2 ) {
    60. System.err.println("Usage: KafkaStreamingSentimentClassifier <brokers> <topics>");
    61. System.exit(1);
    62. }
    63. val Array(brokers, topics) = args;
    64. /********************************************************************
    65. * KAFKA CONSUMER DSTREAM
    66. ********************************************************************/
    67. // Specify the Kafka Broker Options and set of Topics
    68. val kafkaParameters = Map[String, String]("metadata.broker.list" -> brokers);
    69. val topicSet = topics.split(",").toSet;
    70. // Create an input DStream using KafkaUtils and the DefaultDecoder to provide the raw array of Bytes
    71. val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
    72. ssc, kafkaParameters, topicSet);
    73. /********************************************************************
    74. * DESERIALISE USING INVERSION, PRE-PROCESS AND CLASSIFY
    75. ********************************************************************/
    76. // Deserialise the Avro messages using Bijection and the Tweet Schema
    77. messages
    78. .map(message => {
    79. val parser = new Schema.Parser();
    80. val schema = parser.parse(TwitterProducer.TWEET_SCHEMA);
    81. val recordInjection = GenericAvroCodecs.toBinary[GenericRecord](schema);
    82. val record = recordInjection.invert(message._2).get;
    83. Row(record.get("id").toString(), record.get("user_name").toString(), record.get("text").toString());
    84. })
    85. .foreachRDD(rdd => {
    86. // Check that the RDD is not null - otherwise applying the predictive model will raise an IllegalStateException
    87. if (rdd != null && !rdd.isEmpty()) {
    88. // Convert the RDD of Rows to a DataFrame of Tweet ID, Username and Text
    89. val tweetsDF = sparkSession.createDataFrame(rdd,
    90. new StructType().add("id", StringType)
    91. .add("user_name", StringType)
    92. .add("text", StringType));
    93. /********************************************************************
    94. * APPLY THE SAME PRE-PROCESSING PIPELINE TO THE REAL-TIME TWEETS
    95. ********************************************************************/
    96. // Lowercase and remove punctuation
    97. val lowercasedDF = PreProcessorUtils.lowercaseRemovePunctuation(tweetsDF, "text");
    98. // Lemmatize
    99. val lemmatizedDF = lowercasedDF.select("user_name", "text").rdd.mapPartitions(p => {
    100. val props = new Properties();
    101. props.put("annotators", "tokenize, ssplit, pos, lemma");
    102. val pipeline = new StanfordCoreNLP(props);
    103. p.map{
    104. case Row(user_name:String, text: String) => (user_name, text, PreProcessorUtils.lemmatizeText(text, pipeline));
    105. };
    106. }).toDF("user_name", "text", "lemmas");
    107. // Remove Stop Words from the sequence of Lemmas
    108. val stopWordsRemovedDF = PreProcessorUtils.stopWordRemover(lemmatizedDF, "lemmas", "filtered_lemmas")
    109. .where(size(col("filtered_lemmas")) > 1);
    110. /********************************************************************
    111. * SCALED FEATURE VECTORS
    112. ********************************************************************/
    113. // Generate the Scaled Feature Vectors
    114. val featurizedDF = ModelUtils.tfidf(stopWordsRemovedDF, "filtered_lemmas", "features");
    115. /********************************************************************
    116. * APPLY TRAINED DECISION TREE CLASSIFIER TO THE REAL-TIME TWEETS
    117. ********************************************************************/
    118. // Apply the Decision Tree Classifier to the scaled Feature Vectors
    119. // Note that the Decision Tree Classifier was trained using a column called "features".
    120. // A column named "features" MUST also be present on the new dataset in order to make
    121. // predictions. If it is called anything else, an exception will be raised.
    122. val predictions = decisionTreeModel.transform(featurizedDF);
    123. // Output the original tweet's username, text and predicted label
    124. predictions.select("user_name", "text", "predicted_label").show(false);
    125. }
    126. });
    127. // Start the computation
    128. ssc.start();
    129. // Wait for the computation to terminate
    130. ssc.awaitTermination();
    131. }
    132. }

    我们现在可以运行我们的端到端机器学习管道!首先,我将稍微更新我的Kafka Twitter 生产者,根据英国的常见航空公司进行过滤,包括British_AirwaysRyanaireasyJet。假设我的Kafka集群在线,并且我的Kafka Producer正在运行,我可以使用Maven构建我的Spark Consumer Streaming Application并使用以下命令将其部署到我的Spark集群:

    1. # Deploy the Spark Consumer Streaming Application to the Spark Cluster
    2. bin/spark-submit --class io.keisan.knowledgebase.spark.mllib.KafkaStreamingSentimentClassifier --master spark://<Spark Master Hostname>:6066 --deploy-mode cluster /keisan/knowledgebase/spark/mllib/jars/keisan-spark-mllib-0.0.1-SNAPSHOT.jar <Kafka Broker Hostname>:9092 twitter

    假设一切顺利,如果您检查Spark驱动程序的标准输出,您应该看到Spark Dataframes正在打印,包含推文的用户名,文本和预测标签 - 请记住,在我们的案例中,真正的结果是推文是负面的情绪,而错误的结果是推文在情绪上是积极的。以下是一些使用我们的决策树分类器自动分类的推文:

    image.png

    正如您所看到的,我们的决策树分类器并不完全完美,但在大多数情况下,它已正确预测了推文的潜在情绪!航空公司和其他组织可以使用这样的预测模型来告知他们如何近乎实时地改进他们的服务和产品。
    在本文中,我们成功开发了一种近实时,高吞吐量,可靠且容错的机器学习管道,能够在流式传输和接收时对基于事件的数据进行准确预测。在以后的文章中,我将探讨其他类型的监督和非监督算法,以及如何将它们合并到实时管道中。