6 - 教你如何使用Spark分布式执行Python脚本计算数据
Python拥有非常丰富的库,尤其是在科学计算领域,因此很多从事数据分析和科学计算的人偏爱Python。大数据有一个特点是存储在分布式系统,因此如何获取这些数据传给Python计算,并把计算结果存储到分布式系统,是一个不可避免的问题。
pyjava要求Python版本大于等于3.6,建议使用Conda的Python环境。
下面来创建conda的环境,pyjava依赖如下4个包:
conda.yaml
name: respect
dependencies:
- python=3.6
- pip
- pip:
- numpy==1.14.3
- pandas==0.25.3
- pyjava==0.2.8.2
- pyarrow==0.14.1
conda env create -f conda.yaml
uid url
1 http://docs.mlsql.tech
3 https://github.com/allwefantasy/pyjava
2 https://github.com/allwefantasy/pyjava
2 http://docs.mlsql.tech
1 http://docs.mlsql.tech
3 https://github.com/latincross/mlsqlwechat
url uid
组一:
1. http://docs.mlsql.tech 1
2. http://docs.mlsql.tech 1
3. http://docs.mlsql.tech 2
4. https://github.com/allwefantasy/pyjava 2
5. https://github.com/allwefantasy/pyjava 3
组二:
1. https://github.com/latincross/mlsqlwechat 3
第1条处理后,http://docs.mlsql.tech,pv=1 uv=1
第2条处理后,http://docs.mlsql.tech,url和第一条一样,pv=1+1=2 ,uid与第一条一样,uv=1
第3条处理后,http://docs.mlsql.tech,url和第二条一样,pv=2+1=3 ,uid与第二条不一样,uv=1+1=2
第4条处理后,https://github.com/allwefantasy/pyjava,url和第三条不一样,重置pv和uv,pv=1 uv=1
第5条处理后,https://github.com/allwefantasy/pyjava,url和第四条一样,pv=1+1=2,uid与第三条不一样,uv=1+1=2
https://github.com/latincross/mlsqlwechat,pv=1 uv=1
http://docs.mlsql.tech pv=3 uv=2
https://github.com/allwefantasy/pyjava pv=2 uv=2
https://github.com/latincross/mlsqlwechat pv=1 uv=1
val spark = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
import spark.implicits._
val data = Seq(("http://docs.mlsql.tech" ,1),
("https://github.com/allwefantasy/pyjava" ,3),
("https://github.com/allwefantasy/pyjava" ,2),
("http://docs.mlsql.tech" ,2),
("http://docs.mlsql.tech" ,1),
("https://github.com/latincross/mlsqlwechat" ,3)
).toDF("url" ,"uid")
data.createOrReplaceTempView("test")
//根据url分组,组内根据uid排序
val df = spark.sql("select * from test distribute by url sort by uid")
//测试分组输出
df.rdd.foreachPartition(it => {
it.foreach(r=>println("partition id: " + TaskContext.get.partitionId + " ,row: " + r.toString()))
})
val struct = df.schema
val timezoneid = spark.sessionState.conf.sessionLocalTimeZone
val rest = df.rdd.mapPartitions { iter =>
val enconder = RowEncoder.apply(struct).resolveAndBind()
val envs = new util.HashMap[String, String]()
//设置conda python环境
envs.put(str(PythonConf.PYTHON_ENV)
, "source activate respect")
//使用本地python环境
//envs.put(str(PythonConf.PYTHON_ENV), ":")
val batch = new ArrowPythonRunner(
Seq(ChainedPythonFunctions(Seq(PythonFunction(
"""
|#测试的时候可以打开,输出更多调试信息
|#import os
|#os.environ["MLSQL_DEV"]="1"
|def process(_data_manager):
| pv = 0
| uv = 0
| lastUrl = None
| lastUid = None
| output = False
|
| for item in _data_manager.fetch_once_as_rows():
| urlCol = item["url"]
| uidCol = item["uid"]
|
| if lastUrl is None:
| pv = 1
| uv = 1
| lastUrl = urlCol
| lastUid = uidCol
| elif lastUrl == urlCol:
| pv = pv + 1
| if lastUid != uidCol:
| uv = uv + 1
| lastUid = uidCol
| else:
| #lastUrl变了,则输出
| output = True
| yield {"url":lastUrl,"pv":pv,"uv":uv}
|
| #重置pv,uv,output
| pv = 1
| uv = 1
| lastUrl = urlCol
| lastUid = uidCol
| output = False
| #当只有一个url情况下,lastUrl为空表示_data_manager没有数据,不需要输出
| if not output and lastUrl is not None:
| yield {"url":lastUrl,"pv":pv ,"uv":uv}
|data_manager.log_client.log_to_driver("come from worker")
|items=process(data_manager)
|data_manager.build_result(items, 1024)
""".stripMargin, envs, "python", "3.6")))), struct,
timezoneid, Map()
)
val newIter = iter.map { irow =>
enconder.toRow(irow)
}
val commonTaskContext = new SparkContextImp(TaskContext.get(), batch)
val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)
columnarBatchIter.flatMap { batch =>
batch.rowIterator.asScala.map(_.copy())
}
}
//指定输出的shema,转为spark df
val wow = SparkUtils.internalCreateDataFrame(spark, rest,
StructType(Seq(StructField("url", StringType),StructField("pv", LongType),StructField("uv",LongType))), false)
wow.collect().foreach(println(_))
partition id: 0 ,row: [https://github.com/latincross/mlsqlwechat,3]
partition id: 31 ,row: [https://github.com/allwefantasy/pyjava,2]
partition id: 31 ,row: [https://github.com/allwefantasy/pyjava,3]
partition id: 108 ,row: [http://docs.mlsql.tech,1]20/05/14 16:54:23 INFO Executor: Finished task 31.0 in
partition id: 108 ,row: [http://docs.mlsql.tech,1]
partition id: 108 ,row: [http://docs.mlsql.tech,2]
[https://github.com/latincross/mlsqlwechat,1,1]
[https://github.com/allwefantasy/pyjava,2,2]
[http://docs.mlsql.tech,3,2]
调试后,把上述代码粘到spark代码中,就可以了。是不是很方便本地调试啊?
通过分组排序,还可以做很多复杂的计算,比如根据访问日志切割Session,还比如计费系统,相同请求在一分钟之内重复访问不计费,计算有效的计费请求等等,当然也可通过Spark Api开发。在Python的世界中有很多的类库,通过上述方式都是可以使用的。上述例子只是一个简单的迭代的方式,如果partition数据不是很大的情况下,还可以把数据全部加载到Python内存,比如转成pandas的dataframe,然后就可以尽情发挥了。
控制Python程序的状态,比如启动、停止、中断等
Spark与Python数据传输,比如通过File、Socket等
收集Python程序的日志,用户打印日志与系统打印日志
查看Python程序的执行进度
请参照:
https://github.com/allwefantasy/pyjava/pull/2
https://github.com/allwefantasy/pyjava/pull/3
-
#当数据量小的情况下很容易出现空的partition,导致报错,需要修复
serializers.py中的:
#rb = pa.RecordBatch.from_arrays([[]], schema=pa.schema([('value', pa.string())]))
rb = pa.RecordBatch.from_arrays([pa.array([""])], pa.schema([('value', pa.string())]))
//scala代码也要做相应的修改,需要过滤掉空的partition:
val newRest = rest.filter(_.toString != "[]")
val wow = SparkUtils.internalCreateDataFrame(spark, newRest,
StructType(Seq(StructField("url", StringType),StructField("pv", LongType),StructField("uv",LongType))), false)
笔者认为这样修复是合理的,因为空的partition也是由计算得来,也是有意义的,在代码端自行过滤掉也是正常的操作。虽然对于一个工具来说,看着不那么优雅。pyjava还支持Python项目模式,用于处理整个数据集,而不是分区方式,原理比较简单,感兴趣的读者请自行翻阅。
喜欢就点击最上方的[ MLSQL之道 ]关注下吧,后面精彩不断!