vlambda博客
学习文章列表

和美大家说 | 基于Spark MLlib的文本大数据处理


和美大“家”

和美大家说 | 基于Spark MLlib的文本大数据处理

作者 | Kane 

编辑 | 品牌部


在大数据时代,一些大公司,如四大银行、互联网巨头等公司积累了大量数据,对于统计/机器学习工程应用,通过数据抽样,我们使用scikit-learn上集成的丰富算法包来快速验证想法。当需要进行大规模机器学习任务来在全量数据上快速产出生产级应用时,我们往往需要利用MLlib的分布式迭代计算方案来解决scikit-learn上单机的性能瓶颈问题。


MLlib是Spark的机器学习库,通过该库可以简化机器学习的工程实践工作,同时解决了大数据量机器学习任务的迭代计算问题。MLlib包含了非常丰富的机器学习算法:分类、回归、聚类、协同过滤、主成分分析等等。目前,MLlib分为两个代码包:spark.mllibspark.ml


MLlib包含主要模块

和美大家说 | 基于Spark MLlib的文本大数据处理

spark.mllib

Spark MLlib是Spark的重要组成部分,是最初提供的一个机器学习库。该库有一个缺点:如果数据集非常复杂,需要做多次处理,或者是对新数据需要结合多个已经训练好的单个模型进行综合计算时,使用Spark MLlib会使程序结构变得复杂,甚至难以理解和实现。


spark.mllib是基于RDD的原始算法API,目前处于维护状态。该库下包含4类常见的机器学习算法:分类、回归、聚类、协同过滤。指的注意的是,基于RDD的API不会再添加新的功能。


和美大家说 | 基于Spark MLlib的文本大数据处理

spark.ml

Spark1.2版本引入了ML Pipeline,经过多个版本的发展,Spark ML克服了MLlib处理机器学习问题的一些不足(复杂、流程不清晰),向用户提供了基于DataFrame API的机器学习库,使得构建整个机器学习应用的过程变得简单高效。


Spark ML不是正式名称,用于指代基于DataFrame API的MLlib库 。与RDD相比,DataFrame提供了更加友好的API。DataFrame的许多好处包括Spark数据源,SQL / DataFrame查询,Tungsten和Catalyst优化以及跨语言的统一API。Spark ML API提供了很多数据特征处理函数,如特征选取、特征转换、类别数值化、正则化、降维等。另外基于DataFrame API的ml库支持构建机器学习的Pipeline,把机器学习过程一些任务有序地组织在一起,便于运行和迁移。Spark官方推荐使用spark.ml库。


和美大家说 | 基于Spark MLlib的文本大数据处理

数据变换

数据变换是数据预处理的一项重要工作,比如对数据进行规范化、离散化、衍生指标等等。Spark ML中提供了非常丰富的数据转换算法,详细可以参考官网,现归纳如下:

和美大家说 | 基于Spark MLlib的文本大数据处理

上面的转换算法中,词频逆文档频率(TF-IDF)、Word2Vec、PCA是比较常见的,如果你做过文本挖掘处理,那么对此应该并不陌生。


和美大家说 | 基于Spark MLlib的文本大数据处理

数据规约

大数据是机器学习的基础,为机器学习提供充足的数据训练集。在数据量非常大的时候,需要通过数据规约技术删除或者减少冗余的维度属性以来达到精简数据集的目的,类似于抽样的思想,虽然缩小了数据容量,但是并没有改变数据的完整性。Spark ML提供的特征选择和降维的方法如下表所示:

和美大家说 | 基于Spark MLlib的文本大数据处理

选择特征和降维是机器学习中常用的手段,可以使用上述的方法减少特征的选择,消除噪声的同时还能够维持原始的数据结构特征。尤其是主成分分析法(PCA),无论是在统计学领域还是机器学习领域,都起到了很重要的作用。


和美大家说 | 基于Spark MLlib的文本大数据处理

机器学习算法

Spark支持分类、回归、聚类、推荐等常用的机器学习算法。见下表:

和美大家说 | 基于Spark MLlib的文本大数据处理



ML任务基本流程

在介绍ML pipeline之前,我们先回顾一下一个ML任务包含的典型流程:

1. 准备训练数据集 (training examples)

2. 预处理及特征抽取 (training examples => features)

3. 训练模型 (training models(features))

4. 在测试集上进行模型评测 (testing examples => features => results)

可以看到整个ML任务实际上是一个dataflow。更确切地,是两条dataflow。一条是training过程,结束点是训练好的model,另一条是testing过程,结束点是最后得到的results (e.g., predictions)。如果要训练多个模型,那么dataflow会有更多条。


从high-level的角度来看,dataflow里只包含两种类型的操作:数据变换(上面的=>)与模型训练(产生model)。


和美大家说 | 基于Spark MLlib的文本大数据处理

机器学习流水线

1)DataFrame

有关于DataFrame的概念在前文中已经介绍过了。使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。较之RDD,DataFrame包 含了schema 信息,更类似传统数据库中的二维表格。它被ML Pipeline用来存储源数据。例如,DataFrame中 的列可以是存储的文本、特征向量、真实标签和预测的标签等。


2)Transformer

转换器是一种可以将一个 DataFrame转换为另一个DataFrame的算法。比如一个模 型就是一个 Transformer。它可以把一个不包含预测标签 的测试数据集 DataFrame 打上标签,转化成另一个包含 预测标签的 DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个 DataFrame。


3)Estimator

翻译成估计器或评估器,它是学习算法或 在训练数据上的训练方法的概念抽象。可以通俗理解为算法。在 Pipeline 里 通常是被用来操作 DataFrame 数据并生成一个 Transformer。从技术上讲,Estimator实现了一个方法 fit(),它接受一个DataFrame并产生一个转换器。比如, 一个随机森林算法就是一个 Estimator,它可以调用 fit(),通过训练特征数据而得到一个随机森林模型。


4)Pipeline

Pipeline 连接多个转换器和预测器在一起,形成一个机器学习工作流。在机器学习过程中,通过一系列的算法来处理和学习数据是很普遍的,例如,一个简单的文档处理工作流可能包括以下几步:

🔷 将每个文档分成单个词

🔷 将文档中的词转化成数字化的特征向量

🔷 基于特征向量和标签学习得到预测模型

MLlib 将上述一个工作流归为一个 Pipeline,包括一系列的 PipelineStage(多个 Transformer 和 Estimator),按特定的顺序执行。



管道执行原理

Pipeline 由一系列 stage 组成,每个 stage 为一个转换器 (Transformer) 或预测器 (Estimator)。这些 stage 的执行是按一定顺序的,输入的 DataFrame 在通过每个 stage 时被改变。在转换器阶段,transform() 方法作用在 DataFrame 上。预测器阶段,调用 fit() 方法来产生一个转换器(成为 PipelneModel 的一部分),然后该转换器的 transform() 方法作用在 DataFrame 上。


我们通过一个简单的文档工作流来解释其工作原理,下图展示了训练过程中 Pipeline 的工作流程:

和美大家说 | 基于Spark MLlib的文本大数据处理

Pipeline 在训练过程中的流程


上图中,上面一行表示 Pipeline 的三个 stage。前两个 Tokenizer 和 HashingTF 是转换器,第三个 LogisticRegression 是一个预测器。下面一行表示通过 pipeline 的数据流,圆柱体代表 DataFrame。Pipeline.fit() 方法作用于原始 DataFrame,其中包含原始的文档和标签。Tokenizer.transform() 方法将原始文档分成单个词,将其作为新的列加入 DataFrame。HashingTF.transform() 方法将文本列转化成特征向量,将这些向量作为新的列加入 DataFrame。因为 LogisticRegression 是一个预测器,Pipeline 首先调用 LogisticRegression.fit() 来产生一个 LogisticRegressionModel。如果 Pipeline 还有其他的预测器,在将 DataFrame 传入下一个 stage 前,它将先调用 LogisticRegressionModel 的 transform() 方法。


整个 Pipeline 可以看作一个预测器。因此,在一个 Pipeline 的 fit() 方法执行完毕后,它会产生一个 PipelineModel,它是一个转换器。这个 PipelineModel 可在测试阶段调用,下图展示了具体的工作流程:

和美大家说 | 基于Spark MLlib的文本大数据处理

PipelineModel 在测试过程中的流程


上图中,PipelineModel 和原始 Pipeline 有相同数量的 stage,但原始 Pipeline 中的预测器都变成了转换器。当 PipelineModel 的 transform() 方法被作用于测试数据集时,数据会按顺序穿过 pipeline 的各个阶段。每个 stage 的 transform() 方法都对数据集作了改变,然后再将其送至下一个 stage。


Pipeline 和 PipelineModel 有助于确保训练数据和测试数据的特征处理流程保持一致。



详细信息

DAG Pipeline: Pipeline 的 stage 是一个有序数组。这里引用的例子都是线性 Pipeline,其中的每个 stage 所用的数据都是由上一个 stage 产生的。但如果数据的流向为一个有向无环图 (DAG, Directed Acyclic Graph),那么 Pipeline 就可以是非线性的。这种图需要指定每个 stage 中输入和输出列的名字(通常通过参数来指定)。如果 Pipeline 的形式为 DAG,那么每个 stage 都必须为拓扑排序。


Runtime checking: 因为 Pipeline 可以运行在各种类型的 DataFrame 上,所以不能在编译时检查出错误类型。Pipeline 和 PipelineModel 会在 Pipeline 实际运行前进行检查。这种类型检查是基于 DataFrame schema 的,schema 包含了 DataFrame 中每一列数据的类型。


Unique Pipeline stages: Pipeline 的每一个 stage 都应该是唯一的实例。例如,同一个 myHashingTF 实例不应该进入 Pipeline 两次,因为 Pipeline stage 必须有唯一 ID。然而,不同的实例 myHashingTF1 和 myHashingTF2 可以进入同一个 Pipeline,因为两个实例是通过不同 ID 创建的。



存储和读取 Pipelines

通常情况下我们需要将一个模型或 pipeline 存储在磁盘上供以后使用。在 Spark 1.6 中,模型的导入和导出功能加入了 Pipeline 的 API,支持大多数转换器以及一些机器学习模型。具体还需参考算法的 API 文档来确认其是否支持存储和读取。



实际使用案例-文本处理

和美大家说 | 基于Spark MLlib的文本大数据处理

预处理流程

文本清洗 -> 标签索引化 -> 内容文本分词 -> 去除停用词 -> 分词取前5000个词作为特征 -> 特征向量化 -> 保存预处理模型 -> 调用预处理模型 -> 输出预处理数据(indexedLabel,features)

和美大家说 | 基于Spark MLlib的文本大数据处理

标签索引化

首先将文本读取成Dataframe格式,将标签列数据索引化,{文化,经济,军事和体育}向量化后为{0,1,2,3}

/**

    * 数据清洗 可根据具体数据结构和业务场景的不同进行重写. 注意: 输出必须要有标签字段"label"

    * @param filePath 数据路径

    * @param spark SparkSession

    * @return 清洗后的数据, 包含字段: "label", "title", "time", "content"

    */

  def clean(filePath: String, spark: SparkSession): DataFrame = {

    import spark.implicits._

    val textDF = spark.sparkContext.textFile(filePath).flatMap { line =>

      val fields = line.split("\u00EF")   //分隔符:ï,分成标签,标题,时间,内容

      //首页|文化新闻ï第十一届全国优秀舞蹈节目展演将在武汉举办ï2016-07-05 19:25:00ï新华社北京7月5日电(记者周玮)由文化部...

      //首页|财经中心|财经频道ï上半年浙江口岸原油进口量创同期历史新高ï2016-07-04 21:54:00ï杭州7月4日...

      if (fields.length > 3) {

        val categoryLine = fields(0)

        val categories = categoryLine.split("\\|")

        val category = categories.last

        //分成4个标签名和其他,最后去除标签为其他的数据

        var label = "其他"

        if (category.contains("文化")) label = "文化"

        else if (category.contains("财经")) label = "财经"

        else if (category.contains("军事")) label = "军事"

        else if (category.contains("体育")) label = "体育"

        else {}

        //输出标签,标题,时间,内容

        val title = fields(1)

        val time = fields(2)

        val content = fields(3)

        if (!label.equals("其他")) Some(label, title, time, content) else None

      } else None

    }.toDF("label", "title", "time", "content")

    //输出标签,标题,时间,内容DF

    textDF

  }

  /**

    * 处理label转换为索引形式

    * @param data 输入label字段的数据

    * @return 标签索引模型, 模型增加字段: "indexedLabel"

    */

  def indexrize(data: DataFrame): StringIndexerModel = {

    val labelIndexer = new StringIndexer()

      .setInputCol("label")

      .setOutputCol("indexedLabel")

      .fit(data)

    labelIndexer

  }

predictDF.select("label","indexedLabel").show(10, truncate = false)

和美大家说 | 基于Spark MLlib的文本大数据处理



内容字段分词

处理内容字段,首先要进行分词,然后去除停用词以及转换为特征向量,方便分类模型进行训练和预测。本文模仿spark的ml包下的StopWordsRemover类创建了Segmenter类,用于对数据进行分词,其内部调用了HanLP分词工具。


由于spark自带的StopWordsRemover等使用的闭包仅限于ml包,自定义的类无法调用,故只是采用了与StopWordsRemover类似的使用形式,内部结构并不相同,并且由于以上原因,Segmenter类没有继承Transformer类,故无法进行pipeline管道操作,故在分类模型超参数调优过程中,没有加入分词模型的参数调优。

/**

    * 分词过程,包括"分词", "去除停用词"

    * @param data   输入需要分词的字段的数据"content"

    * @param params 分词参数

    * @return 分词处理后的DataFrame,增加字段: "tokens", "removed"

    */

  def segment(data: DataFrame, params: PreprocessParam): DataFrame = {

    val spark = data.sparkSession

    //设置分词模型

    val segmenter = new Segmenter()

      .setSegmentType(params.segmentType) //分词方式

      .isDelEn(params.delEn)              //是否去除英语单词

      .isDelNum(params.delNum)            //是否去除数字

      .addNature(params.addNature)        //是否添加词性

      .setMinTermLen(params.minTermLen)   //最小词长度

      .setMinTermNum(params.minTermNum)   //行最小词数

      .setInputCol("content")             //输入内容字段

      .setOutputCol("tokens")             //输出分词后的字段

    //进行分词

    val segDF = segmenter.transform(data)

    * 分词过程,包括"分词", "去除停用词"

    * @param data   输入需要分词的字段的数据"content"

    * @param params 分词参数

    * @return 分词处理后的DataFrame,增加字段: "tokens", "removed"

    */

  def segment(data: DataFrame, params: PreprocessParam): DataFrame = {

    val spark = data.sparkSession

    //设置分词模型

    val segmenter = new Segmenter()

      .setSegmentType(params.segmentType) //分词方式

      .isDelEn(params.delEn)              //是否去除英语单词

      .isDelNum(params.delNum)            //是否去除数字

      .addNature(params.addNature)        //是否添加词性

      .setMinTermLen(params.minTermLen)   //最小词长度

      .setMinTermNum(params.minTermNum)   //行最小词数

      .setInputCol("content")             //输入内容字段

      .setOutputCol("tokens")             //输出分词后的字段

    //进行分词

    val segDF = segmenter.transform(data)



去除停用词

分词之后,需要对一些常用的无意义词如:“的”、“我们”、“是”等(统称为“停用词”)进行去除。这些词没有多大的意义,但这些词不去掉会强烈的干扰我们对特征的抽取效果。(比如:在体育分类中,“的”出现500次,“足球”共出现300次,但显然足球更能表示体育分类,而“的”反而影响体育分类的结果。


去除停用词的操作我们直接调用ml包中的StopWordsRemover类:

 //读取停用词数据

val stopWordArray = spark.sparkContext.textFile(params.stopwordFilePath).collect()

    //设置停用词模型

    val remover = new StopWordsRemover()

      .setStopWords(stopWordArray)

      .setInputCol(segmenter.getOutputCol)   //读取"tokens"字段

      .setOutputCol("removed")               //输出删除停用词后的字段"removed"

    //删除停用词

    val removedDF = remover.transform(segDF)

    removedDF

  }



特征向量化

由于目前常用的分类、聚类等算法都是基于向量空间模型VSM(即将对象向量化为一个N维向量,映射成N维超空间中的一个点),VSM将数据转换为向量形式,便于对大规模数据进行矩阵操作等,也可以通过计算超空间中两个点之间的距离(一般是余弦距离)来计算两个向量之间的相似度。因此,我们需要将经过处理的语料转换为向量形式,这个过程叫做向量化。


这里我们也调用spark提供的向量化类CountVectorizer类进行向量化操作:

/**

   * 特征向量化处理,包括词汇表过滤

   * @param data   输入向量化的字段"removed"

   * @param params 配置参数

   * @return 向量模型

   */

 def vectorize(data: DataFrame, params: PreprocessParam): CountVectorizerModel = {

   //设置向量模型

   val vectorizer = new CountVectorizer()

     .setVocabSize(params.vocabSize)

     .setInputCol("removed")

     .setOutputCol("features")

   val parentVecModel = vectorizer.fit(data)

   //过滤停用词中没有的数字features

   val numPattern = "[0-9]+".r

   val vocabulary = parentVecModel.vocabulary.flatMap {

     term => if (term.length == 1 || term.matches(numPattern.regex)) None else Some(term)

   }

   val vecModel = new CountVectorizerModel(Identifiable.randomUID("cntVec"), vocabulary)

     .setInputCol("removed")

     .setOutputCol("features")

   vecModel

 }

将字段"content"先进行分词和去除停用词得到"removed",再将所有词作为特征,进行特征向量化得到"features"字段:

和美大家说 | 基于Spark MLlib的文本大数据处理

在模型中可以设置出现次数最多的前5000个词作为分类用的特征,下图5000后有两个数组,第一个数值表示对应前5000个词的第几个词,第二组表示对应第一组出现的词在本条数据中的出现的次数,取出一条完整的数据看看:

和美大家说 | 基于Spark MLlib的文本大数据处理



数据处理模型训练、保存和调用

为了方便每个模型单独训练和预测,将预处理也作为数据处理的模型进行训练,保存和调用,方法如下:

 /**

    * 训练预处理模型

    * @param filePath 数据路径

    * @param spark SparkSession

    * @return (预处理后的数据,索引模型,向量模型)

   * 数据包括字段: "label", "indexedLabel", "title", "time", "content", "tokens", "removed", "features"

    */

  def train(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {

    val params = new PreprocessParam             //预处理参数

    val cleanDF = this.clean(filePath, spark)    //读取DF,清洗数据

    val indexModel = this.indexrize(cleanDF)     //调用索引模型

    val indexDF = indexModel.transform(cleanDF)  //标签索引化

    val segDF = this.segment(indexDF, params)    //将内容字段分词

    val vecModel = this.vectorize(segDF, params) //调用向量模型

    val trainDF = vecModel.transform(segDF)      //内容分词特征向量化

    this.saveModel(indexModel, vecModel, params) //保存模型

    (trainDF, indexModel, vecModel)

  }

  /**

    * 拟合预处理模型

    * @param filePath 数据路径

    * @param spark SparkSession

    * @return (预处理后的数据,索引模型,向量模型)

    */

  def predict(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {

    val params = new PreprocessParam                    //预处理参数

    val cleanDF = this.clean(filePath, spark)           //读取DF,清洗数据

    val (indexModel, vecModel) = this.loadModel(params) //加载索引和向量模型

    val indexDF = indexModel.transform(cleanDF)         //标签索引化

    val segDF = this.segment(indexDF, params)           //内容字段分词

    val predictDF = vecModel.transform(segDF)           //内容分词特征向量化

    (predictDF, indexModel, vecModel)

  }

调用预处理模型,数据处理后的结果取出5条:

和美大家说 | 基于Spark MLlib的文本大数据处理



多分类模型训练和超参数调优

下面选用了常用的4种多分类模型对文本数据进行训练,利用了管道Pipeline + 网格搜索Gridsearch + 交叉验证CrossValidator 进行参数调优,直接将参数调优放在了训练模型里,将得到的最优模型保存。

和美大家说 | 基于Spark MLlib的文本大数据处理

朴素贝叶斯

朴素贝叶斯算法原理

朴素贝叶斯算法是基于贝叶斯定理与特征条件独立假设的分类方法。

条件概率

P(A|B)表示事件B已经发生的前提下,事件A发生的概率,叫做事件B发生下事件A的条件概率。其基本求解公式为:

和美大家说 | 基于Spark MLlib的文本大数据处理

贝叶斯定理便是基于条件概率,通过P(A|B)来求P(B|A):

和美大家说 | 基于Spark MLlib的文本大数据处理

朴素贝叶斯模型

常用的模型主要有3个,多项式、伯努利和高斯模型:

🔷当特征是离散的时候,使用多项式模型。

🔷伯努利模型也适用于离散特征的情况,所不同的是,伯努利模型中每个特征的取值只能是1和0,以文本分类为例,某个单词在文档中出现过,则其特征值为1,否则为0,而本文是把单词出现的次数作为特征,所以不适应于伯努利模型。

🔷当特征是连续变量的时候,多项式模型及时加入平滑系数也很难描述分类特征,因此需要使用高斯模型。

平滑系数

超参数平滑系数α,作用是防止后验概率为0,当α = 1时,称作Laplace平滑,当0 < α < 1时,称作Lidstone平滑,α = 0时不做平滑。本文主要对平滑系数进行调参。

/**

    * NB模型训练处理过程

    * @param data 训练数据集

    * @return nbBestModel

    */

  def train(data: DataFrame): NaiveBayesModel = {

    val params = new ClassParam

    //NB分类模型管道训练调参

    data.persist()

    data.show(5)

    //NB模型

    val nbModel = new NaiveBayes()

      .setModelType(params.nbModelType) //多项式模型或者伯努利模型

      .setSmoothing(params.smoothing)   //平滑系数

      .setLabelCol("indexedLabel")

      .setFeaturesCol("features")

    //建立管道,模型只有一个 stages = 0

    val pipeline = new Pipeline()

      .setStages(Array(nbModel))

    //建立网格搜索

    val paramGrid = new ParamGridBuilder()

      //.addGrid(nbModel.modelType, Array("multinomial", "bernoulli"))

      //伯努利模型需要特征为01的数据

      .addGrid(nbModel.smoothing, Array(0.01, 0.1, 0.2, 0.5))

      .build()

    //建立evaluator,必须要保证验证的标签列是向量化后的标签

    val evaluator = new BinaryClassificationEvaluator()

      .setLabelCol("indexedLabel")

    //建立一个交叉验证的评估器,设置评估器的参数

    val cv = new CrossValidator()

      .setEstimator(pipeline)

      .setEvaluator(evaluator)

      .setEstimatorParamMaps(paramGrid)

      .setNumFolds(2)

    //运行交叉验证评估器,得到最佳参数集的模型

    val cvModel = cv.fit(data)

    //获取最优逻辑回归模型

    val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]

    val bestNBModel = bestModel.stages(0).asInstanceOf[NaiveBayesModel]

    println("类的数量(标签可以使用的值): " + bestNBModel.numClasses)

    println("模型所接受的特征的数量: " + bestNBModel.numFeatures)

    println("最优的modelType的值为: "+ bestNBModel.explainParam(bestNBModel.modelType))

    println("最优的smoothing的值为: "+ bestNBModel.explainParam(bestNBModel.smoothing))

    //更新最优朴素贝叶斯模型,并训练数据

    val nbBestModel = new NaiveBayes()

      .setModelType(bestNBModel.getModelType) //多项式模型或者伯努利模型

      .setSmoothing(bestNBModel.getSmoothing) //平滑系数

      .setLabelCol("indexedLabel")

      .setFeaturesCol("features")

      .fit(data)

 

    this.saveModel(nbBestModel, params)

    data.unpersist()

    nbBestModel

  }

后续的三个算法原理网上都有很多,训练的代码也类似,本文只给出模型调参的部分代码。



逻辑回归

    //LR模型

    val lrModel = new LogisticRegression()

      .setMaxIter(bestLRModel.getMaxIter)    //模型最大迭代次数

      .setRegParam(bestLRModel.getRegParam)  //正则化参数

      .setElasticNetParam(params.elasticNetParam) //L1范式比例, L1/(L1 + L2)

      .setTol(params.converTol)          //模型收敛阈值

      .setLabelCol("indexedLabel")       //设置索引化标签字段

      .setFeaturesCol("features")        //设置向量化文本特征字段

 

    //建立网格搜索

    val paramGrid = new ParamGridBuilder()

      .addGrid(lrModel.maxIter, Array(5, 10))

      .addGrid(lrModel.regParam, Array(0.1, 0.2))

      .build()



决策树

  //决策树模型

    val dtModel = new DecisionTreeClassifier()

      .setMinInfoGain(params.minInfoGain)  //最小信息增益阈值

      .setMaxDepth(params.maxDepth)        //决策树最大深度

      .setImpurity(params.impurity)        //节点不纯度和信息增益方法gini, entropy

      .setLabelCol("indexedLabel")         //设置索引化标签字段

      .setFeaturesCol("features")          //设置向量化文本特征字段

    //建立网格搜索

    val paramGrid = new ParamGridBuilder()

      .addGrid(dtModel.minInfoGain, Array(0.0, 0.1))

      .addGrid(dtModel.maxDepth, Array(10, 20))

      .addGrid(dtModel.impurity, Array("gini", "entropy"))

      .build()



随机森林

随机森林模型常常需要调试以提高算法效果的两个参数:numTrees,maxDepth

· numTrees:增加决策树的个数会降低预测结果的方差,这样在测试时会有更高的accuracy。训练时间大致与numTrees呈线性增长关系

· maxDepth:限定决策树的最大可能深度。最终的决策树的深度可能要比maxDepth小

· minInfoGain:最小信息增益(设置阈值),但由于其它终止条件或者是被剪枝的缘故小于该值将不带继续分叉

· maxBins:连续特征离散化时选用的最大分桶个数,并且决定每个节点如何分裂。(25,28,31)

· impurity:计算信息增益的指标,熵和gini不纯度("entropy", "gini")

· minInstancesPerNode:如果某个节点的样本数量小于该值,则该节点将不再被分叉。(设置阈值)

· auto:在每个节点分裂时是否自动选择参与的特征个数

· seed:随机数生成种子

实际上要想获得一个适当的阈值是相当困难的。高阈值可能导致过分简化的树,而低阈值可能简化不够。


预剪枝方法 minInfoGain、minInstancesPerNode 实际上是通过不断修改停止条件来得到合理的结果,这并不是一个好办法,事实上 我们常常甚至不知道要寻找什么样的结果。这样就需要对树进行后剪枝了(后剪枝不需要用户指定参数,是更为理想化的剪枝方法)

//随机森林模型(不加fit)

    val rfModel = new RandomForestClassifier()

      .setMaxDepth(params.maxDepth)          //决策树最大深度

      .setNumTrees(params.numTrees)          //设置决策树个数

      .setMinInfoGain(params.minInfoGain)  //最小信息增益阈值

      .setImpurity(params.impurity)        //信息增益的指标,选择熵或者gini不纯度

      //.setMaxBins(params.maxBins)          //最大分桶个数,用于连续特征离散化时决定每个节点如何分裂

      .setLabelCol("indexedLabel")           //设置索引化标签字段

      .setFeaturesCol("features")            //设置向量化文本特征字段

//建立网格搜索

    val paramGrid = new ParamGridBuilder()

      .addGrid(rfModel.maxDepth, Array(5, 10, 20))

      .addGrid(rfModel.numTrees, Array(5, 10, 20))

      .addGrid(rfModel.minInfoGain, Array(0.0, 0.1, 0.5))

      .build()



多分类模型预测和模型评估

和美大家说 | 基于Spark MLlib的文本大数据处理

模型评估类MulticlassClassificationEvaluator

机器学期一般都需要一个量化指标来衡量其效果:这个模型的准确率、召回率和F1值(这3个指标是评判模型预测能力常用的一组指标),spark提供了用于多分类模型评估的类MulticlassClassificationEvaluator,并将3个指标同时输出

object Evaluations extends Serializable {

  /**

    * 多分类结果评估

    * @param data 分类结果

    * @return (准确率, 召回率, F1)

    */

  def multiClassEvaluate(data: RDD[(Double, Double)]): (Double, Double, Double) = {

    val metrics = new MulticlassMetrics(data)

    val weightedPrecision = metrics.weightedPrecision

    val weightedRecall = metrics.weightedRecall

    val f1 = metrics.weightedFMeasure

 

    (weightedPrecision, weightedRecall, f1)

  }

}



四个多分类模型预测结果和模型评估

以逻辑回归为例,预测结果如下图,"probability"中4个值表示4个类别的预测概率:

和美大家说 | 基于Spark MLlib的文本大数据处理

4个分类模型的评估结果如下:

评估模型代码:

/**

  * Description: 多分类模型预测结果评估对比

  */

object MultiClassEvalution {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

    Logger.getLogger("org").setLevel(Level.ERROR)


    val spark = SparkSession

      .builder

      .master("local")

      .appName("Multi_Class_Evaluation_Demo")

      .getOrCreate()

 

    val filePath = "data/dataTest/predict"

 

    //预处理(清洗、分词、向量化)

    val preprocessor = new Preprocessor

    val (predictDF, indexModel, _) = preprocessor.predict(filePath, spark)

 

    predictDF.select("content","removed", "features").show(1, truncate = false)

    //朴素贝叶斯模型预测

    val nbClassifier = new NBClassifier

    val nbPredictions = nbClassifier.predict(predictDF, indexModel)

 

    //逻辑回归模型预测

    val lrClassifier = new LRClassifier //import Classification.LogisticRegression.LRClassifier

    val lrPredictions = lrClassifier.predict(predictDF, indexModel)

 

    //决策树模型预测

    val dtClassifier = new DTClassifier

    val dtPredictions = dtClassifier.predict(predictDF, indexModel)

 

    //随机森林模型预测

    val rfClassifier = new RFClassifier

    val rfPredictions = rfClassifier.predict(predictDF, indexModel)

 

    //多个模型评估

    val predictions = Seq(nbPredictions, lrPredictions, dtPredictions, rfPredictions)

    val classNames = Seq("朴素贝叶斯模型", "逻辑回归模型", "决策树模型", "随机森林模型")

 

    for (i <- 0 to 3) {

      val prediction = predictions(i)

      val className = classNames(i)

 

      val resultRDD = prediction.select("prediction", "indexedLabel").rdd.map {

        case Row(prediction: Double, label: Double) => (prediction, label)

      }

 

      val (precision, recall, f1) = Evaluations.multiClassEvaluate(resultRDD)

      println(s"\n========= $className 评估结果 ==========")

      println(s"加权准确率:$precision")

      println(s"加权召回率:$recall")

      println(s"F1值:$f1")

    }

  }

}

目前,Spark MLlib业务使用案例分布于各个领域,主要解决海量数据的工业级应用。在大规模数据的个性化推荐系统、数据预测分析、广告、异常监测,图像和视频模型识别、自然语言处理等领域都有丰富的应用,如电商的千人千面智能推荐系统、金融领域信贷审批、智能对话机器人等。


精彩回顾






领取和美信息产品资料,请联系:

如涉及图片及文章内容版权问题,请立即告知,我们将在第一时间核实处理。