搜文章
推荐 原创 视频 Java开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发
Lambda在线 > alitrack > 开窗函数之累积乘,PySpark,Pandas和SQL版实现

开窗函数之累积乘,PySpark,Pandas和SQL版实现

alitrack 2020-07-01

前一篇介绍了, 今天继续开窗函数的累积函数系列,累积乘积(也可以简称为累乘)。

累加cumsum和累乘cumprod主要是用来看数据的变化趋势.

  • 累加是通过流量得到存量,比如每天销售量的多少,得到今年的销售量总量;

  • 累乘是通过变化率来得到存量,比如有每天的数据变动趋势,通过累乘来得到当前的数据;


cumprod在计算股票累计投资组合收益时非常有用,这个会在以后介绍。


测试环境

  • Jupyter Notebook

  • iPython-sql


  • Pandas




数据准备


import pandas as pdimport numpy as npvalues=[(1,0.9), (1,0.13), (2,0.5), (2,0.8), (3,0.6)]columns=['col1', 'col2']df=pd.DataFrame(values, columns=columns)df


Pandas实现

df0=df.copy()df0=df0.sort_values(by=['col1','col2'])df0['cumprod'] = df0.groupby('col1')['col2'].cumprod()df0


PySpark 实现


准备工作

from pyspark.sql import SparkSessionspark=SparkSession.Builder().master("local[*]").getOrCreate()
from pyspark.sql import functions as F, Window, typesfrom functools import reducefrom operator import mul
df1=spark.createDataFrame(df)#df1.show()
partition_column = ['col1']window = Window.partitionBy(partition_column).orderBy("col2")expr = F.col('col2')

使用UDF

mul_udf = F.udf(lambda x: reduce(mul, x), types.DoubleType())df2 = df1.withColumn('col3', mul_udf(F.collect_list(expr).over(window)))df2.orderBy('col1').show()


使用aggregate函数

df2 = df1.withColumn("foo", F.collect_list("col2").over(window))#df2.show()
df3=df2.withColumn("cumprod", F.expr("aggregate(foo, cast(1 as double), (acc, x) -> acc * x)"))df3.orderBy('col1').show()


Spark SQL

df1.registerTempTable("df")sql="""select col1,col2, exp(sum(log(col2)) over( partition by col1 order by col2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as cumprodfrom df"""spark.sql(sql).show()


SQL 版本

SQLite不支持log和exp函数,所以要实现它,比较麻烦(当然可以使用python来创建udf简化),这里写下PostgreSQL的实现,与Spark SQL基本相同


%load_ext sql%sql postgres://postgres:@localhost/postgres
%sql drop table if exists df%sql -p df
%%sqlselect col1,col2, power(10,sum(log10(col2)) over( partition by col1 order by col2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as cumprodfrom df

或者

%%sqlselect col1,col2, exp(sum(ln(col2)) over( partition by col1 order by col2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) as cumprodfrom df



Koalas


如果有什么建议和意见,也欢迎留言,或者加我个人微信,



谢谢点亮[看]




版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《开窗函数之累积乘,PySpark,Pandas和SQL版实现》的版权归原作者「alitrack」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读