人工智能・Spark MLlib分布式处理框架
Spark MLlib分布式处理框架
上回我们说到了点击率预测问题、排序问题、推荐算法。首先,点击率预测问题的数据集特别大,是搜索引擎的所有用户点击每个网页次数的记录。其次,只要调用Spark MLlib分布式处理框架中的ALS()函数,就能在二十行代码以内,建立并训练一个协同推荐算法的模型。由此可见,这三种应用场景都非常适合大数据机器学习,尽管:
在搜索引擎公司刚成立的时候,数据集中的几乎没有任何样本,想要增加训练集样本的数量只能靠程序员自己人工标注;
我们可以对数据集采样,先用一小部分样本来验证一下模型;
只有数据可以存放在多台计算机上,而使用训练好的模型预测还是要在单机上进行。
那么,我们可以把存放其他计算机上的数据轮流拷贝到一台计算机上来训练模型吗?答案是可以的,但不太好,因为数据拷贝的速度比较慢,而且还会浪费其他计算机的计算能力。所以,我们需要的是分布式计算。
分布式计算is derived from Google公司的三篇论文:GFS分布式文件系统、MapReduce分布式计算框架,以及BigTable分布式数据库。
图1:MapReduce分布式计算框架
图2:使用MapReduce统计词频的示意图
在调用Spark MLlib分布式处理框架中的函数之前,宝宝们必须先安装pyspark库和Java JDK环境。值得注意的是,Java JDK环境不是用“pip install”命令安装的,而是用官网上下载的安装包来安装的。只有在cmd中输入了“java -version”命令,可以显示Java JDK的版本信息,这个环境才能be considered as “installation completed”。
图3:输入“java -version”命令,显示Java JDK环境的版本信息
最后,pyspark库算法的原理和sklearn库是完全相同的,只是增加了MapReduce分布式计算。而且,分布式计算是由底层的库来执行的,开发者并不会感觉到任务被分配到多台计算机上去执行。
代码 |
说明 |
from pyspark import SparkContext |
加载相关的库。 |
test_file_name = "day9_code/1_PySpark/test.txt" |
定义本地文本文件的路径。 |
sc = SparkContext("local", "wordcount app") |
创建SparkContext类的实例,object reference是sc。 这行代码的功能与tensorflow深度学习框框架中的“sess = tf.Session()”神相似:在使用Python描述计算过程之后,我们通常会把它放到C/C++高速核心中去执行,使得计算更加高效。 |
text_file = sc.textFile(test_file_name) |
打开位于本地的、名叫“test.txt”的文本文件。变量“text_file”的类型是RDD,即弹性分布式数据集。 |
counts = text_file\ .flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a + b).cache() |
宝宝们还记得被Java Stream支配的恐惧吗?如果忘记了,那么请从《》开始阅读。 在这行代码中,本黄鸭调用的flatMap()、map()、reduceByKey()都是Java Stream类中的函数。而且,它们的参数都是lambda expressions。 调用函数cache(),我们可以暂时把计算结果保存在运存中,而不是把它们写到磁盘中。这样做可以提高代码的执行效率。 |
print("counts.count() = " + str(counts.count())) |
执行刚才定义、计算过程的最后一个节点。变量“counts.count()”的类型不是RDD,而是Python类型的。 |
图4:代码运行的结果
图5:把自定义函数的名称作为函数reduceByKey()的参数
图6:Example of invoking Java.Stream.mapPartitions() method
图7:分布式梯度下降法的示意图
图8:Example of 分布式二值化
图9:Example of 分布式K-Means
这里还有几点需要补充:
1,分布式梯度下降法只可能为了提高计算速度而牺牲迭代的收敛性,不可能为了提高收敛性而降低速度。
2,“libsvm”与SVM支持向量机之间没有任何关系。这是一种非常紧凑的数据格式,除了标签之外,所有的数据都是按照“列的序号:数值”的格式来存放的。
图10:Example of data in“libsvm” format
3,如果宝宝们想要调用函数saveAsTextFile(),把代码输出的结果保存到指定的文件中,那么很可能会出现py4j.protocol.Py4JJavaError。这是因为在默认的情况下,函数saveAsTextFile()会到分布式文件系统中查找指定的文件,而不是在本地文件夹中查找。因此,本黄鸭强烈建议宝宝们把相对路径改为绝对路径。
4,因为分布式计算是由底层的库来执行的,开发者并不会感觉到任务被分配到多台计算机上去执行,所以,generally speaking,Spark MLlib分布式处理框架的使用方式和sklearn库差不多。宝宝们可以在“http://spark.apache.org/docs/latest/ml-guide.html”网站上,查看Spark MLlib库的文档。只是文档的质量比sklearn库的差很多,许多例子只能参考,不能执行。
图11:Spark MLlib库的文档
代码 |
说明 |
from pyspark.ml.classification import LogisticRegression from pyspark.sql import SparkSession from pyspark.ml.feature import StringIndexer from pyspark.ml.feature import VectorAssembler from pyspark.ml.evaluation import MulticlassClassificationEvaluator |
加载相关的库。 |
if __name__ == "__main__": |
如果代码被当做main函数,而不是模块,来执行。 |
spark = SparkSession.builder.appName("kdd99").getOrCreate() |
创建SparkSession类的实例,object reference是spark。 |
df = spark.read.option("header", "true").option("inferschema", "true")\ .option("mode", "DROPMALFORMED").format("com.databricks.spark.csv")\ .load(r"C:\Users\310277614\Desktop\python_work\day7_code\train_sample_ctr.csv").cache() |
导入数据集“train_sample_ctr.csv”。 在前文中,本黄鸭曾经从原始数据集“train.csv”中,随机选取了10000个样本,并保存在“train_sample_ctr.csv”中。 变量“df”的类型是RDD.dataframe。 |
df.printSchema() |
打印“df”每一列的名称、性质。 |
indexer = StringIndexer(inputCol="site_domain", outputCol="site_domainIndex") indexed = indexer.fit(df).transform(df) |
把“site_domain”列中的字符串转化为数值类型的数据。 |
indexed.describe("site_domainIndex").show() |
打印“site_domainIndex”列的统计信息。 |
filtered = indexed.select("click", "site_domainIndex", "hour") filtered.describe().show() |
在“df”中,选中“click”、“site_domainIndex”、“hour”三列。打印这三列的统计信息。 |
assembler = VectorAssembler(inputCols=["site_domainIndex", "hour"], outputCol="features") output = assembler.transform(filtered) output = output.select("features", "click") output.show(n=3) |
把“site_domainIndex”、“hour”列合并为特征矩阵。打印特征矩阵和标签的前三行。 |
(trainingData, testData) = output.randomSplit([0.7, 0.3]) |
切分训练集、测试集。 |
lr = LogisticRegression(labelCol="click", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8) model = lr.fit(trainingData) |
建立并训练分布式逻辑回归模型。 |
predictions = model.transform(testData) predictions = predictions.withColumnRenamed("click", "label") predictions.select("prediction", "label", "features").show(5) |
使用训练好的模型,来预测测试集样本的标签。打印前五行标签的预测值、真实值,以及测试集样本的特征。 |
predictionAndLabels = predictions.select("prediction", "label") evaluator = MulticlassClassificationEvaluator(metricName="accuracy") print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels))) |
打印模型在测试集上的准确度。 |
spark.stop() |
结束SparkSession。 |
图12:代码运行的结果
从上图中,我们可以看出,模型在测试集上的准确度不是非常高,这很可能是因为我们使用的特征比较少。
这里还有一点需要补充,那就是RDD和RDD.dataframe之间的关系。假设本黄鸭有一个名叫“employee.txt”的文本文件,里面有十行信息,每行信息的格式都是“员工姓名,薪水”。
把RDD转化为RDD.dataframe:
代码 |
说明 |
lines = sc.textFile("employee.txt") |
变量“lines”的类型是RDD。 |
parts = lines.map(lambda l: l.split(",")) employee = parts.map(lambda p: Row(name=p[0], salary=int(p[1]))) |
按照逗号的位置,把每行信息拆分“员工姓名”和“薪水”两列。 |
employee_temp = spark.createDataFrame(employee) |
变量“employee_temp”的类型是RDD.dataframe。 |
把RDD.dataframe转化为RDD:
代码 |
说明 |
result = employee_result.rdd.map(lambda p: "name: " + p.name + " salary: " + str(p.salary)).collect() |
调用函数map(),把“员工姓名”和“薪水”两列合并为一个字符串。 变量“result”的类型是RDD。 |
图13:Example of 分布式超参数搜索
图14:代码运行的结果
往期回顾
▼
点击图片,传送到《人工智能・点击率预测和推荐算法》 | 图片来源:网络
点击图片,传送到《人工智能・谭松波酒店评论分类》 | 图片来源:网络
点击图片,传送到《人工智能・人脸表情识别》 | 图片来源:网络
▼