【Flink】处理数据实时归因踩坑
离线归因场景
-- 通过模拟session的方式,对用户的行为进行归因。即对同一个连续会话窗口的KEY排序,然后归到一个元素上。
SELECT
FIRST_VALUE(refer) OVER(PARTITION BY KEY,session ORDER BY logtime) as refer
FROM
(
SELECT
IF(logtime - LAG(logtime)>时长, 1, 0) OVER(PARTITION BY KEY ORDER BY logtime) as session -- 日志相差太大时间就是一个新的session
FROM [表名]
WHERE [过滤条件]
) t
实时归因场景
由于上游处理的问题,我不能保证Kafka的每个分区都能够是真实的日志时间顺序,毕竟如果有用户是故意将时间设置成了未来时间,或者打点有问题,那么用户链路行为就会断,所以只能保证写入Kafka的服务端时间是顺序的,不过这已经没啥意义了。
目前想到的方案就是对SHARE和PLAY先LEFT JOIN,然后排序,LEFT JOIN是为了实现离线的某个action的过滤(就是说离线归因对SHARE和PLAY排序,最终选择action=share即可),LEFT JOIN之后的结果就是说我的某个share可能来源于多个播放,这时候要根据不同业务场景去判断留下哪一个。
-- 代码实现
/*
share流和play流需要自己在自己平台提前配置好watermark和eventtime
**/
-- 先创建一个LEFT JOIN之后的视图
CREATE VIEW share_join_play AS
(
SELECT
share.*,
play.*
FROM share
LEFT JOIN play
ON(
share.userid = play.userid
AND share.deviceid = play.deviceid
AND share.os = play.os
AND share.os_ver = play.os_ver
AND share.app_ver = play.app_ver
AND share.resource_type = play.resource_type
AND share.resource_id = play.resource_id
AND share.eventtime between play.eventtime - INTERVAL '15' MINUTE AND share.eventtime +INTERVAL '15' MINUTE
-- 关联share前后十分钟的播放即可
-- 关联10min后是为了避免埋点在某些情况下,前面的播放埋点可能没有上报或者出错,那就往后面这个资源的播放归,因为某些情况下可能用户没播放就开始分享。这个窗口的逻辑根据自己的业务场景去定。
);
-- 根据JOIN之后的视图进行排序
CREATE VIEW share_attr_play AS
(
SELECT
share_refer
FROM
(
SELECT
play.refer AS share_refer, -- 将JOIN出来的所有可能refer,拿出来作为share的refer
-- 优先分享前的播放归因,且优先最先的播放归因。这个根据自己业务需求可以定义多个ROW_NUMBER
ROW_NUMBER() OVER(
PARTITION BY dt,userid,deviceid,os,os_ver,app_ver,resource_type,resource_id
ORDER BY IF(play.logtime - share.logtime < 0, 1, 0) DESC, ABS(play.logtime - share.logtime)
) AS rn
FROM share_join_play
) t
where rn = 1 -- 取第一个
);
-- 然后根据某些KEY去统计 这里统计每一天每个资源id的各个分享归因的次数
SELECT
share_refer, dt, resource_id, sum(1) as pv
FRON share_attr_play
GROUP BY share_refer, dt, resource_id
数据流的基本过程就是下面这样子,其中的结果可以尝试开启MiniBatch优化,和Local-GlobalAggr优化。
踩坑一
由于Group By之后的也是Retract流,导致对外部存储的选择与要求就有点不一样了。
bear
第一种是如果Flink本身支持识别这种的INSERT和DELETE流的话,可以再group by time window,每隔一分钟计算一次,DELETE代表-,INSERT代表+,然后sum一下。
第二种就是Flink平台自己将DELETE过滤掉,使用一种主键更新SET的外部存储,在这种情况下,每一个KEY只会由Flink的一个PARTITION发出,由它INSERT到外部主键更新的存储中,以一种覆盖的操作代替Flink的DELETE流,并且这个INSERT流是正常的累加结果。需要注意的就是要让外部存储的key和自己Group By的key保持一致。
踩坑二
JOIN状态非常大,似乎一直没有被清理掉
bear
踩坑三
拆分任务后,发现JOIN不出来任何结果。
bear
查看拓扑图,发现watermark已经超出当前时间了。
查看监控输出QPS,一条也关联不上(输出QPS是0)。
状态很快就被清理了。正常情况,双流都会保存下来30min,来等待迟到的数据。