6 - 教你如何使用Spark分布式执行Python脚本计算数据
Python拥有非常丰富的库,尤其是在科学计算领域,因此很多从事数据分析和科学计算的人偏爱Python。大数据有一个特点是存储在分布式系统,因此如何获取这些数据传给Python计算,并把计算结果存储到分布式系统,是一个不可避免的问题。
pyjava要求Python版本大于等于3.6,建议使用Conda的Python环境。
下面来创建conda的环境,pyjava依赖如下4个包:
conda.yamlname: respectdependencies:- python=3.6- pip- pip:- numpy==1.14.3- pandas==0.25.3- pyjava==0.2.8.2- pyarrow==0.14.1
conda env create -f conda.yaml
uid url1 http://docs.mlsql.tech3 https://github.com/allwefantasy/pyjava2 https://github.com/allwefantasy/pyjava2 http://docs.mlsql.tech1 http://docs.mlsql.tech3 https://github.com/latincross/mlsqlwechat
url uid组一:1. http://docs.mlsql.tech 12. http://docs.mlsql.tech 13. http://docs.mlsql.tech 24. https://github.com/allwefantasy/pyjava 25. https://github.com/allwefantasy/pyjava 3组二:1. https://github.com/latincross/mlsqlwechat 3
第1条处理后,http://docs.mlsql.tech,pv=1 uv=1第2条处理后,http://docs.mlsql.tech,url和第一条一样,pv=1+1=2 ,uid与第一条一样,uv=1第3条处理后,http://docs.mlsql.tech,url和第二条一样,pv=2+1=3 ,uid与第二条不一样,uv=1+1=2第4条处理后,https://github.com/allwefantasy/pyjava,url和第三条不一样,重置pv和uv,pv=1 uv=1第5条处理后,https://github.com/allwefantasy/pyjava,url和第四条一样,pv=1+1=2,uid与第三条不一样,uv=1+1=2
https://github.com/latincross/mlsqlwechat,pv=1 uv=1
http://docs.mlsql.tech pv=3 uv=2https://github.com/allwefantasy/pyjava pv=2 uv=2https://github.com/latincross/mlsqlwechat pv=1 uv=1
val spark = SparkSession.builder().master("local[*]").getOrCreate()import spark.implicits._val data = Seq(("http://docs.mlsql.tech" ,1),("https://github.com/allwefantasy/pyjava" ,3),("https://github.com/allwefantasy/pyjava" ,2),("http://docs.mlsql.tech" ,2),("http://docs.mlsql.tech" ,1),("https://github.com/latincross/mlsqlwechat" ,3)).toDF("url" ,"uid")data.createOrReplaceTempView("test")//根据url分组,组内根据uid排序val df = spark.sql("select * from test distribute by url sort by uid")//测试分组输出df.rdd.foreachPartition(it => {it.foreach(r=>println("partition id: " + TaskContext.get.partitionId + " ,row: " + r.toString()))})val struct = df.schemaval timezoneid = spark.sessionState.conf.sessionLocalTimeZoneval rest = df.rdd.mapPartitions { iter =>val enconder = RowEncoder.apply(struct).resolveAndBind()val envs = new util.HashMap[String, String]()//设置conda python环境envs.put(str(PythonConf.PYTHON_ENV), "source activate respect")//使用本地python环境//envs.put(str(PythonConf.PYTHON_ENV), ":")val batch = new ArrowPythonRunner(Seq(ChainedPythonFunctions(Seq(PythonFunction("""|#测试的时候可以打开,输出更多调试信息|#import os|#os.environ["MLSQL_DEV"]="1"|def process(_data_manager):| pv = 0| uv = 0| lastUrl = None| lastUid = None| output = False|| for item in _data_manager.fetch_once_as_rows():| urlCol = item["url"]| uidCol = item["uid"]|| if lastUrl is None:| pv = 1| uv = 1| lastUrl = urlCol| lastUid = uidCol| elif lastUrl == urlCol:| pv = pv + 1| if lastUid != uidCol:| uv = uv + 1| lastUid = uidCol| else:| #lastUrl变了,则输出| output = True| yield {"url":lastUrl,"pv":pv,"uv":uv}|| #重置pv,uv,output| pv = 1| uv = 1| lastUrl = urlCol| lastUid = uidCol| output = False| #当只有一个url情况下,lastUrl为空表示_data_manager没有数据,不需要输出| if not output and lastUrl is not None:| yield {"url":lastUrl,"pv":pv ,"uv":uv}|data_manager.log_client.log_to_driver("come from worker")|items=process(data_manager)|data_manager.build_result(items, 1024)""".stripMargin, envs, "python", "3.6")))), struct,timezoneid, Map())val newIter = iter.map { irow =>enconder.toRow(irow)}val commonTaskContext = new SparkContextImp(TaskContext.get(), batch)val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)columnarBatchIter.flatMap { batch =>batch.rowIterator.asScala.map(_.copy())}}//指定输出的shema,转为spark dfval wow = SparkUtils.internalCreateDataFrame(spark, rest,StructType(Seq(StructField("url", StringType),StructField("pv", LongType),StructField("uv",LongType))), false)wow.collect().foreach(println(_))
partition id: 0 ,row: [https://github.com/latincross/mlsqlwechat,3]partition id: 31 ,row: [https://github.com/allwefantasy/pyjava,2]partition id: 31 ,row: [https://github.com/allwefantasy/pyjava,3]partition id: 108 ,row: [http://docs.mlsql.tech,1]20/05/14 16:54:23 INFO Executor: Finished task 31.0 inpartition id: 108 ,row: [http://docs.mlsql.tech,1]partition id: 108 ,row: [http://docs.mlsql.tech,2]
[https://github.com/latincross/mlsqlwechat,1,1][https://github.com/allwefantasy/pyjava,2,2][http://docs.mlsql.tech,3,2]
调试后,把上述代码粘到spark代码中,就可以了。是不是很方便本地调试啊?
通过分组排序,还可以做很多复杂的计算,比如根据访问日志切割Session,还比如计费系统,相同请求在一分钟之内重复访问不计费,计算有效的计费请求等等,当然也可通过Spark Api开发。在Python的世界中有很多的类库,通过上述方式都是可以使用的。上述例子只是一个简单的迭代的方式,如果partition数据不是很大的情况下,还可以把数据全部加载到Python内存,比如转成pandas的dataframe,然后就可以尽情发挥了。
控制Python程序的状态,比如启动、停止、中断等
Spark与Python数据传输,比如通过File、Socket等
收集Python程序的日志,用户打印日志与系统打印日志
查看Python程序的执行进度
请参照:
https://github.com/allwefantasy/pyjava/pull/2
https://github.com/allwefantasy/pyjava/pull/3
-
#当数据量小的情况下很容易出现空的partition,导致报错,需要修复serializers.py中的:#rb = pa.RecordBatch.from_arrays([[]], schema=pa.schema([('value', pa.string())]))rb = pa.RecordBatch.from_arrays([pa.array([""])], pa.schema([('value', pa.string())]))//scala代码也要做相应的修改,需要过滤掉空的partition:val newRest = rest.filter(_.toString != "[]")val wow = SparkUtils.internalCreateDataFrame(spark, newRest,StructType(Seq(StructField("url", StringType),StructField("pv", LongType),StructField("uv",LongType))), false)
笔者认为这样修复是合理的,因为空的partition也是由计算得来,也是有意义的,在代码端自行过滤掉也是正常的操作。虽然对于一个工具来说,看着不那么优雅。pyjava还支持Python项目模式,用于处理整个数据集,而不是分区方式,原理比较简单,感兴趣的读者请自行翻阅。
喜欢就点击最上方的[ MLSQL之道 ]关注下吧,后面精彩不断!
