推荐 原创 视频 Java开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发
Lambda在线 > Spark技术日报 > 一篇文章搞懂 PySpark MLlib

一篇文章搞懂 PySpark MLlib

Spark技术日报 2018-10-28

001

PySpark MLlib 基础


PySpark MLlib 单独成篇是因为,MLlib 提供了大量和特征工程相关的工具,譬如缺失值处理,字符转化为数字,词向量工具等,这些可以都是使用Scala语言分布式实现,并且可以使用Python来调用。在这个章节,我们会用一个实际项目来练手,可能有点难度,但是如果细加揣摩的话,会有巨大的帮助。

在进行代码练习之前,我们需要做一些准备工作。

//下载项目git clone https://github.com/allwefantasy/spark-deep-learning.git .//切换到release 分支git checkout release//进入spark-deep-learning目录运行如下命令build/sbt assembly

如果没有大问题,应该就有了下面的包了:

target/scala-2.11/spark-deep-learning-assembly-0.2.0-spark2.1.jar

如果这一步骤编译太久或者你觉得太麻烦了,也是可以跳过的,可以感受下Spark MLlib的用法。

我们的目标,是要设计这么一个系统,当把一张拥有很多字段的表给系统,系统自动抽取出特征,这些特征可以给机器学习算法如SVM,贝叶斯之类的,也可以适配深度学习的要求。初看起来显得野心太大,但也不是不可能,我们总是需要一些挑战的。

为了实现自动特征化,核心是四点:类型,规则,统计,先验。

类型,所谓类型指的是Spark DataFrame 的数据是强类型的,常见类型有String,Int, Double, Float, Array, VectorUDF等,他们其实可以给我们提供一定的信息,比如String一般而言有两种可能性:

需要分词的字段,一般而言会转化tf/idf 或者word sequence(LSTM/CNN)形式。 不需要分词的字段,一般其实就是分类字段。

Int 我们可以求一个distinct值,如果很少,很可能是一个分类字段,比如性别,年龄等。Double, Float等则可能是连续的,比如可能是金额等。

规则,字段的名字也能给我们一定的启发,通常如果类型是String,并且名字还是title,body,sentence,summary之类的,一般是需要分词的字段。

Int类型而且还是age,gender之类的名字,则必定是个分类字段。在类型的基础上,让我们更好的确认,该如何特征化某个字段。

统计,当规则无法给我们帮助时,我们仅仅知道某个字段是一个int,我们该怎么办,这个时候统计就起作用了,如果某个字段只有少数几个类型,比如性别 ,统计只有两种可能性,这么少的可能性,那我们就可以对待为分类属性,可以进行one-hot化了。

如果发现有几十万个种类,可能就是售价之类的,那么就自然当做连续值即可,并且我们可能需要做一些缺失值处理。

先验,当然,我们可以通过人工干预,比如明确告知系统哪些是需要分词的字段,哪些是字段需要离散化,这些作为系统的先验知识。系统自动识别这种规则,然后自动进行处理,你唯一需要做的就是告知哪些字段要做什么处理。

因为本小节的目标是为了为了演示如何利用PySpark MLlib完成这个强大的任务,为了简化起见,在本文中,我们只做下面提及的规则集,更复杂的规则集,在我上面提到的GitHub项目中有。

  • 对浮点类型做缺失值处理

  • String类型字段如果是分类字段,则转化为one-hot向量

  • 需要分词的String类型字段,word sequence & word embedding形态。


其中第三条,可以保证我们处理完的数据直接丢给Tensorflow进行模型训练。具体细节我们会在后续的章节里面阐述。

对浮点类型做缺失值处理,这个比较简单,我们先说下大家需要导入哪些PySpark的包:

from pyspark.ml import Estimator, Transformerfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import Imputer, Param, Params, TypeConverters, VectorAssembler, VectorIndexer, Tokenizer, \    HashingTF, OneHotEncoder, QuantileDiscretizer, Normalizerfrom pyspark.ml.linalg import VectorUDT, Vectors, DenseVectorfrom pyspark.sql.types import *import pyspark.sql.functions as fn

接着我想把一个DataFrame的某一列进行缺失值处理,依然采用之前的数据集,但是内容需要做些调整:

a b c,1.0a b,c,3.0d,4.0

df = spark.read.csv(    "YOUR-PATH/a.csv",    encoding="utf-8",    header=False,    schema=StructType(        [StructField("text", StringType()),         StructField("index", FloatType())])) imputer = Imputer(inputCols=["index"], outputCols=["index_wow"]) df = imputer.fit(df).transform(df) df.show()

执行结果如下:

这里我们看到,核心只有两行代码,构建Imputer,调用对应的fit,transform方法形成一个新的表(df)就可以了。

PySpark MLlib里,有几个概念,需要了解下:

  • Pipline 组合Estimator,Transformer完成一个完整的数据处理过程。

  • Estimator 你可以理解为算法的训练过程。

  • Transformer 模型进行预测

  • Parameter, Estimator ,Transformer共享的参数配置体系


这里比较核心的概念是Estimator,Transformer。Estimator核心方法是fit, Transformer的核心方法是transform。拿前面的Imputer为例,他首先有个训练的过程,需要拿到所有的index字段的值,做一些计算,得到一个模型,接着才能做出预测,填充缺失的值。所以Imputor是一个Estimator,同时也是一个Transformer。

当我们想把多个Imputor链接在一起,那么可以使用Pipline。

现在我们看看,如何把一个字符串转化为一个one-hot向量。为了完成这个工作,我们需要先把文本转化为依次递增的数字,然后再把数字转化为one-hot向量,为了方便,我使用了Pipline:

string_index = StringIndexer(inputCol="text",outputCol="text_number") encoder = OneHotEncoder(inputCols=["text_number"], outputCols=["text_onehot"]) pipline = Pipeline(stages=[string_index,encoder]) pipline.fit(df).transform(df).show()

运行结果如下:

在上面的图片中你可能会感到好奇,为什么text_onehot列是类似这么一个东西:

(3,[1],[1.0])

实际的含义为:该向量长度为3,第一个位置为1。可以理解为向量稀疏表达的一种方式。

现在,让我们来一个复杂的,我希望把text字段分词(按空格),每个词用一个向量表示,组成一个二维矩阵,并且把得到行号,然后把文本字段里的词替换成行号。类似下面这样:

a: Vector1b: Vector2c: Vector3

a 在第0个位置,b在第一个位置,给定一行,比如:

a c b

最终被替换成

0 2 1

首先我们要训练词向量:

word2vec = Word2Vec(vectorSize=100, minCount=1,                          inputCol="text",                            outputCol="text_vector") w2v_model = word2vec.fit(ds) word_embedding = w2v_model.getVectors().rdd.map(            lambda p: dict(word=p.word, vector=p.vector.values.tolist())).collect()

这一步我们把text字段喂给Word2Vec模型,然后获取得到词向量word_embedding。

word_embedding_with_index = [dict(word_index=(idx + 1), word=val["word"], vector=val["vector"]) for                                          (idx, val)                                          in                                          enumerate(word_embedding)] word_embedding_with_index.insert(0, dict(word_index=0, word="UNK",                                                    vector=np.zeros(100).tolist()))

对数组生成一个下标,从1开始,然后在数组前端插入一个记录,叫UNK,并且值是一个100维的0。这主要是我们后续在做NLP处理时解决词汇不在向量词典里的情况。

这样我们就得到了上面的关系:

  • 词被表示成了向量

  • 词都有一个唯一的数字标识


从上面三个例子,我们可以看到PySpark MLlib是非常强大的,可以做非常多的特征工程相关的工作。

请各位兄弟帮忙点下阅读原文,感谢 

↓↓↓

版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《一篇文章搞懂 PySpark MLlib》的版权归原作者「Spark技术日报」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读

关注Spark技术日报微信公众号

Spark技术日报微信公众号:SparkDaily

Spark技术日报

手机扫描上方二维码即可关注Spark技术日报微信公众号

Spark技术日报最新文章

精品公众号随机推荐