在一个现代营销平台中,要实现实时的用户标签计算及营销触达,主要面临以下挑战:
海量的行为日志数据接入在面临千万日活和数十亿事件的用户规模下,行为日志数据能够被及时的处理是一个基本要求。
高维度的用户标签数据营销规则的触发本质上就是依赖各个用户的标签数据,而不同用户的标签维度离散且日益增长,如何进行高效的数据存储和实时查询、聚合,是我们面临的巨大挑战。
低延时的营销活动触达保证对于一些依赖事件实时触发的营销活动,需要保证整个链路在毫秒级内完成。
用户会话级别内的行为指标分析,带来的庞大的状态存储开销。
系统的稳定性以及可扩展、容错性,以及在系统出现故障或历史数据重跑之后的数据的一致性保证。
通过以下例子可以大致理解我们的部分应用场景。
根据规则周期性计算积分敏感人群,设计活动,定期向该人群推送积分兑换礼券、打折券等活动(通过短信、微信公众号等),以促进该类人群的交易转化。需要解决的关键问题包括:
积分敏感人群的筛选,主要依赖于业务方对该类群体的业务定义,平台自定义了丰富的算子可以完成规则的构建。
能够及时准确的根据规则计算出人群,依赖于离线批处理任务的稳定性。在新的人群计算的同时可以算出该人群的流入人群、流出人群,也可以针对性的设计营销活动。
通过判断一段时间内(天 / 周级别)用户行为(如交易事件发生)满足指定条件,经过其他条件筛选、去除僵尸用户之后,实时地向用户推送活动。例:用户当日刷卡消费满三笔可参加积分抽奖活动。需要解决的关键问题包括:
单渠道内需要立即检测到事件发生,并且能够快速的判断出是否符合条件(当日刷卡消费满三笔),低延迟地完成一次用户触达(短信、公众号推送等),让用户参与活动。鉴于在行为表中直接筛选查询无法满足低延迟的需求,我们采取了预处理的方式,使用一种特殊类型的数据结构存储每个用户的一段时间内的行为,支持实时的对发生过的事件内容进行统计(可以根据配置记录不同时间区间的事件)。
跨渠道事件的侦测,需要提前对两个(或多个)渠道内的用户进行打通,一般根据渠道内的身份标识(如手机号)进行匹配,主渠道需要订阅另一个渠道的事件,才能够进行侦测。同一用户会话内的行为触发场景通过分析单个用户在同一次会话(Session)期间的行为轨迹,比如如在一个会话期间内发生过添加购物车行为却没有发生购买行为的,我们可以有针对性地做一些实时的营销激励。
会话的切分逻辑以及会话内用户海量行为状态,带来对存储及实时计算的压力。
会话中的行为轨迹灵活多变,需要提供一套统一的足够灵活的配置规则引擎。
需要支持会话内特殊时点(如会话开始 / 会话结束 / 是否新用户等)的标签计算和营销活动触发。
Flink 是一个针对流数据和批数据的分布式处理引擎,基于 Java 实现。Flink 所要处理的主要场景就是流数据,会把所有任务当成流来处理,而批数据只是流数据的一个极限特例而已。Flink 可以支持本地的快速迭代,以及一些环形的迭代任务。Flink 可以定制化内存管理,相对于 Spark,Flink 并没有将内存完全交给应用层。
我们需要不断的处理用户事件(通过 SDK 或其他方式接入消息队列)并做一些标签实时计算和营销活动,因此流式计算引擎是非常适合的。而在业界比较常见的三款流式计算框架(SparkStreaming、Storm、Flink)中,我们选择了 Flink 的原因有以下几点:
首先业务需求需要对事件(用户行为)提供实时的响应(毫秒级),对低延迟的要求比较高,因此我们放弃了使用 SparkStreaming,因为 SparkStreaming 是基于微批的处理思想,只能提供准实时(秒级)。而 StructuredStreaming 目前还比较早期,故暂时不考虑。
相较于 Storm,我们希望选择一个批流统一的技术栈和生态圈,并且有更加活跃的社区。从目前的版本迭代以及项目的 star 数来看,Flink 显然更胜一筹。
Flink 丰富的状态类型和天然的分布式的状态存储和恢复机制,让开发人员可以更加聚焦在业务实现中,而不再需要考虑依赖外部存储来保留中间结果,并且在服务异常情况下的数据恢复问题。
我们的不少业务场景有对窗口操作(Window)的依赖,Flink 提供了比 Storm 更加丰富的窗口操作。
目前 Flink 越来越受到关注,社区的活跃度也在增加,相信未来会更加的完善,基于以上几点,选择了 Flink 作为我们的统一计算框架。
通过上图可以看到,数据处理平台主要分为三个核心模块:数据统一收集、离线数据处理、实时数据处理。
数据统一收集模块:通过 rest 方式将各个数据源的数据接入到我们的平台,并通过 flume 将日志双写到 Kafka 和 Hdfs,供后续实时 / 离线任务的处理。
实时数据处理模块:主要负责用户标签计算、会话内的行为轨迹分析、跨渠道间的 IDMapping 以及营销活动事件触发。
离线数据处理模块:主要负责标签的初始化操作、标签 / 人群的全量计算以及计算任务的部署。在存储层,因为需要支持海量用户高维标签的随机读写,我们选择使用 Hbase 用于支持实时 / 离线计算。同时,对于人群画像、指标分析等 OLAP 功能,我们选择使用 ElasticSearch。
实时流程中主要基于渠道的方式来组织数据及其数据流式处理,各个渠道独立运行自身的标签计算流程。流处理的全流程概览,如下图所示:
上图中,包含了在多渠道的场景下,规划的独立 / 共享的 Kafka Topic 的情况,以及每个对应的 Topic 被消费处理的 Flink Streaming Job 的列表。基于上图,能够了解到流处理过程中基本的数据流,以及数据流最终操作的外部存储系统。在具体实现中,我们有以下几个需要注意的问题:
我们所有的实时计算逻辑都依赖于用户行为事件,而确保任务是按事件发生的先后顺序处理,是我们必须保证的,否则数据就会出现不一致的情况。因此,我们通过在数据接入层进行了按用户 ID 做分组,保证单个用户的数据落在 Kafka 的同一个 partition 中。
Flink 任务中的时间处理类型我们应该选择哪个?
默认情况下,Flink 使用的是 ProcessingTime 来作为 TimeCharacteristic。但是我们需要依赖于事件发生时间作为我们的 watermark 生成及 session 窗口触发,因此我们选择 TimeCharacteristic 为 EventTime。
单用户的 session 窗口如何生成,以及 session 窗口如何实现会话开始 / 结束的触发?
目前 Flink 的窗口函数中已经支持了 SessionWindow 的窗口机制。因此我们只需要根据 UID 分组然后直接使用即可。但是,默认情况下所有的 Window 只会在结束之后,触发我们的窗口函数。因此为了满足在事件不同条件下的触发,我们需要自定义 Trigger,并且通过利用 State 机制来传递当前的触发类型,来以便后续的 window 处理程序能够识别出不同的触发类型,从而进行不同的逻辑处理。
当事件流在一段时间内处于空闲状态,如何保证 session window 能够被照常触发?
考虑到我们在容忍一定延迟数据的情况下,希望在 session window 结束时能够被尽早的触发。所以如果指定时间内(此处我们设置为 session 的时长,即可认为是 session timeout 时间)依然没有数据流入系统,那么我们将会尝试干预 watermark 机制,当它感知到当前流处于 idle 状态时,会尝试自动递增上升 watermark,当当前的 watermark 累计递增了 session timeout 时间时,就相应的达到了 session window 的及时触发效果。当然,这样做的风险是 watermark 水位被提高了,但在我们场景中这个是可以接受的。目前在 Flink 社区中也有关于 stream idle 的讨论,感兴趣的可以参考一下【FLINK-5018】。
5.session window 窗口太大(定义为 30 分钟),导致大量的 state 如何处理?
默认情况下,Flink 使用 JobManager 的堆内存进行状态的存储,同时可以配合使用 FileSystem 进行状态的持久化,但是对于我们的场景中,状态太大,已经超过了默认的 state 大小(默认为 5M),同时在每次进行 checkpoint 的时候,默认都是全量的 checkpoint, 导致了 checkpoint 处理过慢而超时,甚至数据对齐导致的背压,严重影响了程序的处理能力。因此,我们选择使用 rocksdb 来进行状态存储,主要他可以支持增量的 checkpoint,并且减少了 JobManager 的压力。
实时 Flink 任务中如何动态订阅配置信息,并同步到所有相关的 Task 中?
目前我们的所有配置变更都存储在 Kafka 中,任何依赖配置的变更,都可以通过订阅 Kafka topic 来获得最新的配置信息。最初的时候,我们通过使用 Flink 的 connect API 将数据流和配置流关联起来。当有新的配置变化后,可以通过 broadcast 将所有配置同步到各个 Task 上。但是,存在的问题是 broadcast 只能将配置信息下发给直属下游,而并不是它的所有下游节点,即缺少一个全局的配置。因此,我们的解决思路是,通过在每个 TaskManager 实例启动时,实例化一个 Kafka Consumer,然后通过采用监听者模式,去推送最新的配置给当前 JVM 中的所有订阅该配置类型的算子实例。这样就保证了单个 JVM 中共享同一套配置信息,同时任意的算子实例都可以订阅自己依赖的配置,而无需关心上下游的问题。
离线任务的需求主要有以下几点:
对于新创建(或修改)的特性,进行一次全量用户的初始值计算,这个过程结束后再由实时任务对每个用户的特性值进行更新。
人群计算(以及附属人群),目前主要的场景是周期性计算或触发计算,通过离线计算完成。
多渠道数据打通任务。
其他计算任务和平台异常情况的数据回补场景。
首先每个计算任务开始前都会将计算对象的状态标识为“正在计算”,此时计算对象的规则是不可以被修改的,防止计算结果不一致。计算完成后,再次更新计算对象的状态为“正常”。其次由于实时和离线任务都会对同一张表进行更新,虽然“正在计算”的特性不会在实时任务中处理,但是该特性依赖的其他特性还是会正常被计算,如果此时实时和离线任务同时进行,最后很大可能造成计算结果不一致,因此目前的数据存储模型决定了实时和离线任务不能重叠计算。现在采取的方案是,离线计算在凌晨 2~5 点期间完成,此时实时事件发生较少(几乎可以忽略不计),影响较小。
然后再介绍下特性离线计算的方式。特性表存储在 HBase 中,通过 API 对 HBase 数据进行大批量数据扫描计算,不仅效率低下,而且占用 HBase 的计算资源(HRegionServer)。我们通过解析 HFile 的方式,分布式读取数据并进行计算,然后批量写回特性表进行更新,减少 HBase 的服务器压力,并提高计算效率,可以进行横向扩展。
在离线批量计算任务中,我们还使用了 Flink SQL/TableAPI,例如在渠道打通的任务中,使用 SQL 和 TableAPI 以非常直观的方式组合关系运算符,如 select、filter 和 join 等。不过 TableAPI 和 SQL 在 Flink 中还没有支持的很完善,正在积极开发中(引自 Flink 官网)
目前的人群计算都是离线,T+1 的数据,未来根据需求可以提供准实时的计算。
目前批流结合的方式是,离线批处理任务在实时事件最少发生的时间段完成。但是不能保证完成没有影响计算结果的事件发生。之后会进行改进,首先暂停实时事件的消费,在离线任务完成后再恢复消费。
各个计算模块的指标监控和实时数据流质量异常检测。
更加完备的数据回补方案以及如何更加无缝的保证批流结合。
罗国灏,TalkingData 大数据研发工程师,目前主要负责营销场景下的用户标签实时计算研发工作。
王涛,TalkingData 高级架构师,多年大型外企和国内互联网公司企业软件和大数据系统的架构和设计经验,负责公司企业营销产品的架构和研发。
孙博文,TalkingData 数据工程师,多年分布式计算和存储领域研发经验,负责 SMCE 企业侧产品研发,一方数据的处理分析。
你也「在看」吗?👇