vlambda博客
学习文章列表

【Flink】处理数据实时归因踩坑

前一段时间因为想对某些实时数据进行归因,埋点里面没有归因字段,必须用到双流JOIN,第一次尝试,总结了一些坑。【Flink】处理数据实时归因踩坑【Flink】处理数据实时归因踩坑【Flink】处理数据实时归因踩坑
举个例子,B站上有播放数据和分享数据,分享数据里面没得归因,但是播放有,所以想对分享通过播放归因试试看,一般来说,用户要么是播放结束前分享,要么是播放结束后分享,在这个场景下,尝试了一些可能的方案。

离线归因场景

-- 通过模拟session的方式,对用户的行为进行归因。即对同一个连续会话窗口的KEY排序,然后归到一个元素上。SELECT FIRST_VALUE(refer) OVER(PARTITION BY KEY,session ORDER BY logtime) as referFROM( SELECT    IF(logtime - LAG(logtime)>时长, 1, 0) OVER(PARTITION BY KEY ORDER BY logtime) as session -- 日志相差太大时间就是一个新的session FROM [表名] WHERE [过滤条件]) t
比如连续的日志行为超过了20min就是一个新的session窗口。

【Flink】处理数据实时归因踩坑


实时归因场景

本来说实时也想实现类似功能,但是要对两个流的数据全排序,实时里面没尝试过,感觉状态会炸掉。

【Flink】处理数据实时归因踩坑

由于上游处理的问题,我不能保证Kafka的每个分区都能够是真实的日志时间顺序,毕竟如果有用户是故意将时间设置成了未来时间,或者打点有问题,那么用户链路行为就会断,所以只能保证写入Kafka的服务端时间是顺序的,不过这已经没啥意义了。

目前想到的方案就是SHAREPLAYLEFT JOIN,然后排序,LEFT JOIN是为了实现离线的某个action的过滤(就是说离线归因对SHAREPLAY排序,最终选择action=share即可),LEFT JOIN之后的结果就是说我的某个share可能来源于多个播放,这时候要根据不同业务场景去判断留下哪一个。【Flink】处理数据实时归因踩坑【Flink】处理数据实时归因踩坑【Flink】处理数据实时归因踩坑【Flink】处理数据实时归因踩坑

-- 代码实现/*share流和play流需要自己在自己平台提前配置好watermark和eventtime**/
-- 先创建一个LEFT JOIN之后的视图CREATE VIEW share_join_play AS(SELECT share.*, play.*FROM shareLEFT JOIN playON( share.userid = play.userid AND share.deviceid = play.deviceid AND share.os = play.os AND share.os_ver = play.os_ver AND share.app_ver = play.app_ver AND share.resource_type = play.resource_type AND share.resource_id = play.resource_id AND share.eventtime between play.eventtime - INTERVAL '15' MINUTE AND share.eventtime +INTERVAL '15' MINUTE -- 关联share前后十分钟的播放即可  -- 关联10min后是为了避免埋点在某些情况下,前面的播放埋点可能没有上报或者出错,那就往后面这个资源的播放归,因为某些情况下可能用户没播放就开始分享。这个窗口的逻辑根据自己的业务场景去定。);
-- 根据JOIN之后的视图进行排序CREATE VIEW share_attr_play AS( SELECT share_refer FROM ( SELECT play.refer AS share_refer, -- 将JOIN出来的所有可能refer,拿出来作为share的refer -- 优先分享前的播放归因,且优先最先的播放归因。这个根据自己业务需求可以定义多个ROW_NUMBER ROW_NUMBER() OVER( PARTITION BY dt,userid,deviceid,os,os_ver,app_ver,resource_type,resource_id ORDER BY IF(play.logtime - share.logtime < 0, 1, 0) DESC, ABS(play.logtime - share.logtime) ) AS rn FROM share_join_play ) t where rn = 1 -- 取第一个);
-- 然后根据某些KEY去统计 这里统计每一天每个资源id的各个分享归因的次数SELECT share_refer, dt, resource_id, sum(1) as pvFRON share_attr_playGROUP BY share_refer, dt, resource_id


数据流的基本过程就是下面这样子,其中的结果可以尝试开启MiniBatch优化,和Local-GlobalAggr优化。

前面JOIN都没问题,直到RANK出来的结果,它是一个 Retract 流,因为我们的数据有先来后到,日志时间控制不了的,比如有两条JOIN结果A和B,其中A的PLAY比SHARE时间早1秒,但是B的早5秒,由于数据延迟的原因,这个B数据可能晚来,那么在B来之前,这个RANK发到下游的结果应该是归到了A,所以下发了一个INSERT A。一旦B来了,那么就会下发DELETE A和INSERT B。这时候如果直接写入外部存储就会有问题。而GROUP BY恰好可以处理这种流,不过同样的,它下发的仍然是Retract流,一种是INSERT,一种是DELETE。



【Flink】处理数据实时归因踩坑



踩坑一

由于Group By之后的也是Retract流,导致对外部存储的选择与要求就有点不一样了。

bear
  • 第一种是如果Flink本身支持识别这种的INSERT和DELETE流的话,可以再group by time window,每隔一分钟计算一次,DELETE代表-,INSERT代表+,然后sum一下。

  • 第二种就是Flink平台自己将DELETE过滤掉,使用一种主键更新SET的外部存储,在这种情况下,每一个KEY只会由Flink的一个PARTITION发出,由它INSERT到外部主键更新的存储中,以一种覆盖的操作代替Flink的DELETE流,并且这个INSERT流是正常的累加结果。需要注意的就是要让外部存储的key和自己Group By的key保持一致。


踩坑二

JOIN状态非常大,似乎一直没有被清理掉

bear

【Flink】处理数据实时归因踩坑


主要体现在JOIN的那个地方。经过仔细排查,发现INTERVAL 15 MINUTE的时间窗口貌似没有将过期数据处理掉,由于GROUP BY需要一整天的状态,因此我设置了table.exec.state.ttl为24h,这样貌似导致join的窗口过期也时效了。导致一个没清理。
我思考了两种方式,一种是去掉table.exec.state.ttl参数,但是不清楚Group by和rank时状态啥时候清理,目前没有明确的说法。
我采用第二种,将两算子拆分开来,JOIN任务照常计算。RANK和Group by仍然加上table.exec.state.ttl自己手动控制状态过期清理。





踩坑三

拆分任务后,发现JOIN不出来任何结果。

bear
  • 查看拓扑图,发现watermark已经超出当前时间了。

  • 查看监控输出QPS,一条也关联不上(输出QPS是0)。

  • 状态很快就被清理了。正常情况,双流都会保存下来30min,来等待迟到的数据。

查看Interval Join的实现源码,发现在处理流两边的数据之前,会调用IsLate将当前logtime和watermark进行比较,如果过期了就不处理,并且会根据这个watermark判断数据过期清理状态。唯一丢数据的可能性就在这里,怀疑的是,在某个时候,他们都出现了未来时间,这样每个并行度发出的watermark可能正好都是未来时间,这样到了IntervalJoin时,接收到的都是未来时间戳的watermark,将本地的watermark置为了未来时间。
解决方式:和业务沟通,需要将未来时间设置成当前时间,这样任务就能够正常跑了。