vlambda博客
学习文章列表

小白如何写Python算法-计算模型稳定性评估指标PSI

小白如何写Python算法-计算模型稳定性评估指标PSI

前言

最近在研究如何存储和查询十亿数据的事情
突然插了一档子事
公司让我临时救个🔥
python算法需求比较多 人手紧缺
让我来弄个算法需求
计算PSI
我一听懵逼了 啥是PSI
临危受命 不行也得上啊

PSI知识储备

简介

PSI反映了验证样本在各分数段的分布与建模样本分布的稳定性
在建模中,我们常用来筛选特征变量、评估模型稳定性

稳定性是有参照的
因此需要有两个分布——实际分布(actual)和预期分布(expected)
在建模时通常以训练样本(In the Sample, INS)作为预期分布
而验证样本通常作为实际分布
把两个分布重叠放在一起

大白话解释

训练数据中的标签列即y列 表示实际数据
训练数据用于模型训练 得到一个模型
用该模型进行数据预测 预测出来的y列即预期数据
将实际数据和预期数据进行同样的分段 
每一段内分别计算数据分布 然后比较
用来反映模型的稳定性

必要性

风控 稳定性压倒一切
一套风控模型 通常一年以上 才会被替换下线
如果模型不稳定 意味着模型不可控
对于业务本身而言就是一种不确定性风险
直接影响决策的合理性 
直观理解上的系统稳定 通常是指某项指标波动小(低方差)
指标曲线几乎是一条水平的直线
此时就会觉得系统运行正常稳定 很有安全感

变异系数cv

变异系数 cv 来衡量这种数据波动水平
变异系数越小,代表波动越小,稳定性越好
变异系数 C·V =( 标准偏差 SD / 平均值Mean )× 100%

只用变异系统来表示稳定性是不够的

因为实际中由于受到客群变化(互金市场用户群体变化快)
数据源采集变化(比如爬虫接口被风控了)等等因素影响
实际样本分布将会发生偏移
就会导致模型不稳定
所以才需要PSI

PSI公式

小白如何写Python算法-计算模型稳定性评估指标PSI
小白如何写Python算法-计算模型稳定性评估指标PSI

计算步骤

步骤一

将变量预期分布(excepted)进行分箱(binning)离散化
统计各个分箱里的样本占比

分箱方式

分箱可以是等频、等距或其他方式
分箱方式不同,将导致计算结果略微有差异

变量类型

  • 连续型变量
对于连续型变量(特征变量、模型分数等)
分箱数需要设置合理,一般设为10或20
  • 离散型变量
如果分箱太多可以提前考虑合并小分箱
分箱数太多,可能会导致每个分箱内的样本量太少而失去统计意义
分箱数太少,又会导致计算结果精度降低

步骤二

按相同分箱区间
对实际分布(actual)统计各分箱内的样本占比

步骤三

算各分箱内的A - E和Ln(A / E)
计算index = (实际占比 - 预期占比)* ln(实际占比 / 预期占比) 

步骤四

将各分箱的index进行求和,即得到最终的PSI
小白如何写Python算法-计算模型稳定性评估指标PSI

业务含义

PSI数值越小,两个分布之间的差异就越小,代表越稳定
小白如何写Python算法-计算模型稳定性评估指标PSI

PSI近似概念相对熵

相对熵/KL散度/信息散度
是两个概率分布间差异的非对称性度量

物理含义

1、当两个随机分布相同时,它们的相对熵为零
当两个随机分布的差别增大时,它们的相对熵也会增大

2、相对熵是一个从信息论角度量化距离的指标 与数学概念上的距离有所差异
数学上的距离需要满足:非负性、对称性、同一性、传递性等
而相对熵不满足对称性

计算公式

在信息理论中,相对熵等价于两个概率分布的信息熵(Shannon entropy)的差值
小白如何写Python算法-计算模型稳定性评估指标PSI

P(x)表示数据的真实分布,而Q(x)表示数据的观察分布

简单理解

概率分布携带着信息,可以用信息熵来衡量
若用观察分布Q(x)来描述真实分布P(x),还需要多少额外的信息量
小白如何写Python算法-计算模型稳定性评估指标PSI

KL散度是单向描述信息熵差异

数值案例介绍相对熵

假如一个字符发射器
随机发出0和1两种字符
真实发出概率分布为A,但实际不知道A的具体分布
通过观察,得到概率分布B与C,各个分布的具体情况如下
小白如何写Python算法-计算模型稳定性评估指标PSI

计算出得到KL散度

小白如何写Python算法-计算模型稳定性评估指标PSI

由上式可知

相对熵KL(A||C) > KL(A||B)
说明A和B之间的概率分布在信息量角度更为接近
而通过概率分布可视化观察,也认为A和B更为接近,两者吻合

相对熵与PSI之间的关系

PSI公式变形

小白如何写Python算法-计算模型稳定性评估指标PSI

将PSI计算公式变形后可以分解为2项

小白如何写Python算法-计算模型稳定性评估指标PSI

结论

  • PSI本质上是实际分布(A)与预期分布(E)的KL散度的一个对称化操作

  • 双向计算相对熵,并把两部分相对熵相加,从而更为全面地描述两个分布的差异

PSI指标的业务应用

实际评估需要分不同粒度

  • 时间粒度(按月、按样本集)
  • 订单层次(放贷层、申请层)
  • 人群(若没有分群建模,可忽略)

时间窗

时间窗尽可能至今为止
有可能建模时间窗稳定
但近期时间窗出现不稳定

评估过程

  • 先在放贷样本上计算PSI,剔除不稳定的特征;
  • 再对申请样本抽样(可能数据太大)
  • 计算PSI再次筛选

建议先看变量数据分布(EDD)

PSI只是一个宏观的指标
建议先看变量数据分布(EDD)
看分位数跨时间变化来检验数据质量

PSI计算细节也予以保留

无法得知PSI上升时,数据分布是左偏还是右偏
建议把PSI计算细节也予以保留
便于在模型不稳定时,第一时间排查问题

PSI源码

https://gitee.com/pingfanrenbiji/population-stability-index-argo/blob/master/psi.py

结合具体业务实现需求

经过上面的学习 咱们已经知道了PSI是个什么玩意了
而且还有了实现好的算法源码
该算法需要2方面的数据 
一个是实际数据(训练数据)

实际数据即是训练数据集中的标签列 
比如贷款数据样本 
标签列为是否按时还款

另一个是预期数据(预测数据根据模型得到的预测结果)

预期样本是
根据训练出来的模型
对于即将要贷款的用户进行预测是否会还款

接下来就要结合自己公司的业务来得到这块数据调用算法就可以了

搭建python服务

公司让我来做psi
因为我是用java开发的嘛 
其实是想让我用java来实现psi
我当时就感觉不太合理 java不合适写算法
后来的调研也确认了这个想法
算法还是得需要用ptyhon来写
且都已经实现好了 理解了原理之后 
直接就可以用了

用python写的话 对于写java的我来说
有点难度 但稍微克服一下就可以了

python web框架 flask

源码demo
https://gitee.com/pingfanrenbiji/simple-model-monitor-demo
基本语法记录一下
  • restful api
from flask_restful import reqparse, abort, Api, Resource

@app.route('/test/<jobid>', methods=['POST'])
def test_psi_last(jobid):
    return "成功"

注意:参数用<> 而非{}

  • sql查询语句执行 返回元组类型
import pymysql

 jobid = '1111111'
 # 创建数据库连接
 conn = pymysql.connect(
        host=mysql_host,
        user=mysql_user,
        password=mysql_password,
        db=mysql_db,
        port=mysql_port
 )
# 获取操作游标,也就是开始操作
cur = conn.cursor() 
# 定义查询sql语句 该sql返回一条数据
sql = """
      select id from studio_algor_param_record where job_no=%s order by createtime desc limit 1;
    "
""
#执行查询
cur.execute(sql, (jobid))
# 获取查询结果
mid = cur.fetchall()
#关闭连接
conn.close()
# mid是tuple元组数据类型((xxxx,))
res_list = [x[0] for x in mid]
# 获取xxxx字符串
mid = res_list[0]
  • 通过pandas执行sql查询返回dataframe类型
# 数据库连接的获取同上

# 查询sql语句 返回list集合
sql = 'select predict_result from studio_model_predict_result where mid =\'' + mid + '\''
# 通过pandas执行sql语句得到dataframe类型的数据结果
df = pd.read_sql(sql, con=conn)
  • 插入sql
# 数据库连接的获取同上
# 获取操作游标,也就是开始操作
cur = conn.cursor() 
# 插入sql语句
sql = """
      insert into studio_job_step_files (id,jobid,file_type,path,remark)
      values (%s,%s,%s,%s,%s)
    "
""
# 生成uuid唯一id
id = str(uuid.uuid1()).replace('-''')
# 执行sql
cur.execute(sql, (id, jobid, '100', py_test, result))
# 提交事务
conn.commit()
# 关闭连接
conn.close()
  • 使用alluxo(内存文件系统)

alluxio client源码

https://gitee.com/pingfanrenbiji/simple-model-monitor-demo/tree/master/alluxio

连接alluxio

import alluxio

client = alluxio.Client(alluxio_ip, alluxio_port)


获取文件

got = None
# alluxio文件路径
path = "/poc/d1e5a1efbe764f9d80b200d3a90c5bdf"
with client.open(path, 'r') as f:
  got = f.read()

写入本地文件

# 本地文件路径
py_test_csv = "test.csv"
# 将读取到的alluxio文件写入本地文件
with open(py_test_csv, 'w') as f:
  f.write(got.decode('utf-8'))

注:write方法需要字符串类型的参数
got是字节流类型 通过decode解码得到字符串类型

读取csv文件得到dataframe类型的数据

df = pd.read_csv(py_test_csv, sep=",")
# 获取执行列
df['y']

上传文件

# 将dataframe类型的数据写入alluxio文件
# result_detail是dataframe类型的数据
with client.open(py_test, 'w') as f:
f.write(bytes(result_detail.to_json().encode("utf8")))

后记

上述过程花了2天时间

从对PSI概念不了解到慢慢了解

没有写过Python工程的代码 连if else语法都要现学 到学会使用Python语言解决问题

所以呀

要相信自己的潜力 

以解决问题为目标 不断学习相关的知识

想办法使用能够使用的资源 

你一定可以战胜它的