来源 | 微信公众号“腾讯大数据”(ID:tencentbigdata)
2019 年 4 月 1-2 日,Flink Forward 2019 San Francisco 会议在旧金山召开。Flink Forward 会议邀请了来自 Google、Uber、Netflix 和 Alibaba 等公司在实时计算领域的顶尖专家和一线实践者,深入讨论了 Flink 社区的最新进展和发展趋势,以及 Flink 在业界的应用实践。随着近年来对 Flink 技术的广泛应用以及对 Flink 社区的活跃贡献,腾讯也受邀参加了会议并以主题 Developing and Operating Real-Time Applications at Tencent 介绍了腾讯大数据在实时计算平台建设上的工作。
近年来,实时计算在腾讯得到了越来越广泛的应用。在腾讯内部,实时计算应用主要分为以下四类:
ETL:ETL 应该是目前实时计算最普遍的应用场景。例如在 TDBank 的数据链路中,TDSort 读取消息缓存系统 Tube 中的消息,通过流数据处理系统将消息队列中的数据进行实时分拣,并落地到 HDFS 接口机集群,并将最终分拣后的数据由加载到 TDW 中。
监控系统:随着服务数量和机器规模的不断增长,线上环境日益复杂,对监控和报警系统也提出了更高的要求。监控系统需要能够对产品和服务进行多维度的监控,对指标数据进行实时的聚合和分析,并支持方便灵活的报警规则设置。
实时 BI:实时的业务报表对产品运营有着非常大的帮助,能够帮助我们的运营人员实时掌握产品数据,及时制定运营策略,通过更好的时效性获取竞争优势。
在线学习:实时计算目前在推荐、广告和搜索等产品中也有着十分广泛的应用。一般来说,用户兴趣会在多个时间维度上持续变化。通过对用户行为进行实时检测,我们能够及时获取用户的当前兴趣并提供更精准的用户行为预测。
目前腾讯的实时计算的规模已经十分庞大。数据平台部实时计算团队每天需要处理超过了 17 万亿条数据,其中每秒接入的数据峰值达到了 2.1 亿条。
为了提高用户流计算任务持续集成和持续发布的效率,实时计算团队从 2017 年开始围绕 Flink 打造了 Oceanus (http://data.qq.com),一个集开发、测试、部署和运维于一体的一站式可视化实时计算平台。Oceanus 集成了应用管理、计算引擎和资源管理等功能,提供了三种不同的应用开发方式,包括画布,SQL 和 Jar,来满足不同用户的开发需求,同时通过日志、监控、运维等周边服务打通了应用的整个生命周期。
Oceanus 还研发了 Oceanus-ML 来提高在线学习任务的开发效率。Oeanus-ML 提供端到端的在线机器学习,涵盖数据接入,数据处理,特征工程,算法训练,模型评估,模型部署整个机器学习流程。通过 Oceanus-ML,用户可以方便地利用完备的数据处理函数,丰富的在线学习算法来构建自己的在线学习任务,轻松地完成模型训练和评估,同时可以一键部署模型,管理模型的整个生命周期。
在完成作业开发之后,用户可以通过 Oceanus 对作业进行测试、配置和部署。Oceanus 为用户程序提供了一系列的工具来协助作业测试。用户既可以使用 Oceanus 提供的一键生成功能产生测试数据,也可以自己向 Oceanus 上传自己的的测试数据,通过对比预期结果和实际结果来验证应用逻辑的正确性。Oceanus 依托腾讯内部的资源调度系统 Gaia 来进行资源管理和作业部署。用户可以通过 Oceanus 配置作业所需要的 CPU 和内存资源,并指定作业需要部署的集群。当用户完成配置之后,Oceanus 会向 Gaia 申请对应的资源并将作业提交到 Gaia 上运行。
Oceanus 对 Flink 作业运行时的多个运行指标进行采集,包括 Task Manger 的内存,I/O 和 GC 等。通过这些丰富的运行指标,用户能够很好的了解应用运行的情况,并在出现异常时能协助用户及时的定位问题。运维人员则可以通过这些采集到的指标,设置报警策略并实现精细化的运营。
大部分 Oceanus 的用户可以使用画布方便的构建他们的实时计算应用。Oceanus 提供了常见的流计算算子。在开发实时计算应用时,用户将需要的算子拖拽到画布上,配置这些算子的属性并将这些算子连接,这样就构建好了一个流计算应用。这种构建方式十分简单,不需要用户了解底层实现的细节,也不需要掌握 SQL 等语言的语法,使得用户能够专注于业务逻辑。
Oceanus 同样也提供了 SQL 和 Jar 的方式来开发实时计算作业。在使用 SQL 和 Jar 进行开发时,一个比较麻烦的地方就是作业的配置问题。例如 SQL 脚本没有提供任何方式来允许用户进行作业资源配置。尽管 Flink 的 DataStream API 为用户提供了接口来修改并发度和资源等配置,但为了灵活修改这些配置,用户常常需要自己通过外部配置文件的进行处理。为了用户能够在使用 SQL 和 Jar 进行开发时也能方便的进行作业配置,Oceanus 会首先对用户提交的 SQL 脚本和 JAR 包进行解析和编译,生成作业执行的 JobGraph,并可视化在页面上。用户之后可以根据可视化的 JobGraph 来进行作业配置。通过提供可视化的配置方式,用户作业开发的效率可以得到极大的提高。
配置好的 JobGraph 之后将被提交到 Yarn 集群上执行。为了能够更准确的获取更多的作业信息,Oceanus 放弃了 Flink 默认的提交方式,而通过增强过的 ClusterClient 接口来提交作业。通过 ClusterClient,Oceanus 可以获得作业的 JobID, Yarn Application ID 等信息,并利用这些信息来提供作业的生命周期管理。
Oceanus 对正在运行的作业采集了大量的指标,通过这些指标来监控作业运行情况,并在发生故障时定位原因。为了提高运维效率,我们根据长期积累的经验对运行指标进行了筛选,并对 Flink UI 进行了重构来合理展示这些指标。
每个 task 输入和输出队列的使用率是在实际生产中非常有用的运行指标。通常来说,当一个 task 的输入队列被占满,而输出队列为空时,说明这个 task 的上游的数据产生速度已经超过了这个 task 的处理能力,导致了这个 task 的输入出现了堆积。当一个作业中出现这样的 task 时,我们就需要通过性能优化或者增加并发度的方式来提高这个 task 的处理性能。
输入和输出的 TPS 也是在作业运行中的关键指标。通常来说,一个 task 的输出 TPS 和输入 TPS 之间的比例并不会随着并发度的变化而变化。我们利用这个性质来确定作业运行时的并发度。当确定作业并发度时,我们首先将所有 task 的并发度设置为 1 并启动作业。此时这个作业显然是无法处理上游的数据的,因此大部分 task 的单机处理能力会被打满,其输入和输出 TPS 可以达到最大值。根据需要的 TPS 和单机最大 TPS,我们可以估算出每个 task 的并发度,并重新启动。之后根据前面提到的输入输出队列的使用率,我们对作业并发度进行一定的调整来去除作业中的性能瓶颈。一般通过几次调整之后,我们就可以得到较为理想的作业并发度配置。
在后面,我们希望能够实现自动化脚本或者优化器来简化作业并发的配置。不过这仍然是一个非常有挑战的工作。一个主要的难点在于窗口算子的处理。很多窗口算子在平时较为空闲,但在窗口触发时会一下子发送大量的结果。在那一瞬间,如果窗口算子的并发度不够就会出现一定的结果延迟。如何平衡窗口算子在空闲和触发时的并发度目前看来仍然需要很多的 trade-off。
当一个 task 的最大和最小 TPS 之间出现较大的差值时,一般就意味着作业中出现了负载倾斜。负载倾斜会对作业的性能造成较大的影响,同时也很难通过增加并发度的方式来提高性能。为了减少负载倾斜对作业性能的影响,我们引入了 Local Keyed Streams。相关工作将在后面的第三部分进行介绍。
在即将发布的新版 Oceanus 中,我们还对 TaskExecutor 的线程信息进行了采集。这些线程信息能够很好地帮助用户定位发生的问题。例如 checkpoint 可能会由于多种多样的原因而超时。当用户实现的 source function 在被 IO 或者网络堵塞时并没有释放 checkpoint 锁,那么正在执行的 checkpoint 可能就会由于无法及时获取锁而超时。用户也有可能实现了一个堵塞的 checkpoint 函数,由于较慢的 HDFS 写入或者其他原因而导致 checkpoint 超时。通过观察线程信息,我们就可以容易的知道 checkpoint 超时的原因。
这些采集的线程信息也能对程序的性能优化提供很多帮助。一般而言,当一个 task 线程的 cpu 使用率达到 100% 时,就说明这个 task 的执行并没有受到加锁,I/O 或者网络等操作的影响。在上图中,我们展示了一个 Word Count 程序的 Task Executor 的线程信息。在 Word Count 程序,我们有一个 source task 在持续不断的发送 word,还有一个 map task 对出现的 word 进行计数。可以看到,在 Task Executor 中,map 线程的 cpu 使用率几乎达到了 100%,这说明其的执行是没有太大问题的。而 source 线程的 cpu 使用率仅仅只有 80%,这说明其的性能受到了影响。观察线程堆栈,我们可以发现 source 线程时常会堵塞在数据的发送上。这是很好理解的,因为每产生一个 word,map 线程都需要比 source 线程执行更多的指令。也就是说,map 线程的数据处理能力比 source 线程的生产能力要低。为了提高这个 Word Count 程序的性能,我们就需要保证 map 线程的数目比 source 线程的数目多一点。
除了在 Oceanus 上提供方便强大的接口和工具之外,我们还对 Flink 内核进行了大量的改进来提高其可用性和可靠性。这些改进主要包括以下几个部分:
作业管理相关:我们对 Flink 的作业管理的改进主要以提高作业执行的可靠性为主,包括对分布式环境下的 leader 选举的重构和无需作业重启的 Job Master 恢复机制等。同时,我们也正在研究和开发细粒度恢复机制来减少发生故障时需要重启的 task 数目。
资源调度相关:我们对 Flink 的资源调度,特别是在 Yarn 集群上的资源调度进行了重构,以提供更好的资源使用率。同时,我们也正在研究如何使用分布式和异步的资源调度框架来提高超大并发度的作业的资源调度效率。
可用性相关:我们在 Flink 中提供了多个算子,包括 local keyby, incremental windows, dim join 等。这些算子能够很好的提高用户开发程序的效率和程序执行的性能。
Flink 的 master 负责资源申请、任务调度、checkpoint 协调,并响应用户请求。在任何时刻,一个集群中都只可以有一个 master 节点可以在工作状态。当集群中出现多个 master 节点时,就需要通过 leader 选举确定工作的 master 节点。
在分布式环境下进行 Leader 选举是分布式系统中的一个经典问题。目前 Flink 依赖 Zookeeper 进行 leader 选举,并将当选的 leader 的信息保存在 Zookeeper 上以实现服务发现。但在复杂的集群环境中,Flink 当前的实现并不能很好的保证 leader 选举和发布的正确性。
如上图左侧所示,当 JM1 获得 leader 之后,其需要在 Zookeeper 发布其地址以供其他节点来发现自己。但如果在其发布地址之前,JM1 发生了 Full GC,那么集群就可以陷入混乱之中。其长时间的 GC 可能会导致其丢失 leader 以及和 Yarn 之间的心跳连接。此时一个新的 master 节点, JM2, 可能会被 Yarn 拉起。JM2 在获得 leader 之后会将其地址发布在集群中。当如果此时 JM1 从 Full GC 中恢复过来,并继续执行之前的代码,将其地址发布在集群中,那么 JM1 的地址将会覆盖 JM2 的地址导致集群混乱。
另一个由于 leader 选举导致的常见问题是 checkpoint 的并发访问。当一个 master 丢失 leader 节点之后,其需要立即停止其所有正在进行的工作并退出。但是如果此时旧 master 的 Checkpoint Coordinator 正在完成 checkpoint,那么退出方法将无法获取到锁而执行。此时,在已经丢失了 leader 的情况下,旧 master 仍然有机会完成一个新的 checkpoint。而此时,新 master 却会从一个较旧的 checkpoint 进行恢复。目前 Flink 使用了许多 tricky 的方法来保证多个 master 节点对 checkpoint 的并发访问不会导致作业无法从故障中恢复,但这些方法也导致我们目前无法对失败的 checkpoint 进行有效的脏数据清理。
为了上述问题,我们对 Flink 的 leader 选举和发布进行了重构。我们要求每个 master 节点在竞争 leader 时都创建一个 EMPHEMERAL 和 SEQUENTIAL 的 latch 节点。之后所有 master 节点会检查 latch 目录下所有的节点,序列号最小的那个节点将会被选举为 leader。
Zookeeper 的实现保证了创建的 latch 节点的序列号是递增的。所以如果一个 master 节点被选为 leader 之后,只要它的 latch 节点仍然存在,就意味着它的序列号仍然是所有 master 节点中最小的,它仍然是集群中的 leader。从而我们就可以通过检查一个 master 的 latch 节点是否存在来判断这个 master 是否已经丢失 leader。通过将 leader 地址的发布以及对 checkpoint 的修改等更新操作和对 latch 节点的检查放置在一个 Zookeeper 事务中,我们可以保证只有保有 leader 的 master 节点才可以对作业执行状态进行修改。
Master 节点会由于多种不同的原因而发生故障。目前在 master 重启时,Flink 会重启所有正在执行的 task,重新开始执行作业。在 Zookeeper 连接出现抖动时,集群中所有 task 都会重启,对 HDFS、Zookeeper 和 YARN 这些集群基本组件带来较大的压力,使得集群环境进一步恶化。
为了减少 master 恢复的开销,我们实现了无需作业重启的 master 恢复机制。首先,我们使用 Zookeeper 和心跳等手段来对 master 的状态进行监控。当 master 发生故障时,我们立即拉起一个新的 master。新 master 在启动时,并不会像第一次执行时那样申请资源并调度任务,而是会进入到 reconcile 阶段,等待 task 的汇报。
在另一边,task executor 在丢失了和 master 节点的连接之后,也不会立即杀死这个 master 负责的 task。相反,它将等待一段时间来发现新 master 的地址。如果在这段时间内发现了新 master 的地址,那么 task executor 将把其执行的 task 的信息汇报给新 master。
新 master 通过 task executor 汇报上来的信息来重建其 execution graph 和 slot pool。当所有 task 完成汇报,并且所有 task 在 master 恢复的这段时间内没有出现故障,那么 master 就可以直接切换作业状态到 running,并继续作业的执行。如果有 task 未能在规定时间内汇报,或者有 task 在这段时间内发生故障,那么 master 将切换到 failover 状态并通过重启恢复执行。
目前 Oceanus 依赖 YARN 来进行资源申请和任务调度。但现有 Flink 在 YARN 上资源分配的实现有着较大的问题,对作业可靠性带来了一定的风险。
在现在 Flink 的实现中,每个 task executor 都有着一定数目的 slot。这些 slot 的数目是在 task executor 启动时根据配置得到的。当为任务分配资源时,task 会按照可用 slot 的数目分配到空闲的 task executor 上,一个 task 占据一个 slot。在这个过程中,Flink 并不会考虑 task 实际使用的资源量以及 task executor 剩余可用的资源量。
这种资源分配的方式是十分危险的,会导致 task executor 向 YARN 申请的资源量和实际 task 使用的资源量不匹配。在集群资源紧张的时候,由于 YARN 会杀死那些超用资源的 container,作业就会进入不断重启的状态之中。
这种资源分配的方式也会导致较严重的资源浪费。在实际中每个算子所需的资源使用量是不同的。有的算子需要较多的 CPU 资源,而有的算子需要较少的内存资源。由于现在的配置中所有 task executor 具有相同的 slot 数目,所有 slot 都具有相同的资源,因此导致较为严重的资源碎片,无法充分利用集群资源。
为了避免由于资源分配导致的不稳定,我们修改了 Flink 在 YARN 上的资源申请协议。我们不再使用静态的 slot 配置,而是根据 task 申请动态的创建和销毁 slot。首先,我们要求用户能够为每个 operator 设置其所需的资源量。这样我们就可以根据 slot 中执行的 operator 来得到每个 slot 所需的资源量。当 Master 节点请求一个 slot 时,我们遍历所有的 task executor 并在空余资源量能够满足 slot 请求的 task executor 上创建一个新的 slot 提供给 master 节点。当这个 slot 中的 task 完成执行之后,这个 slot 也将被删除并将其资源归还给 task executor。这种动态的 slot 申请方式可以使得 Flink 的资源利用率极大的提高。
现实中,很多数据具有幂律分布。在处理这类数据时,作业执行性能就会由于负载倾斜而急剧下降。
以 WordCount 程序作为示例。为了统计每个出现 word 的次数,我们需要将每个 word 送到对应的 aggregator 上进行统计。当有部分 word 出现的次数远远超过其他 word 时,那么将只有少数的几个 aggregator 在执行,而其他的 aggregator 将空闲。当我们增加更多的 aggregator 时,因为绝大部分 word 仍然只会被发送到少数那几个 aggregator 上,程序性能也不会得到任何提高。
为了解决负载倾斜的问题,我们提供了 Local Keyby 算子,允许用户在 task 本地对数据流进行划分。划分得到的 Local keyed streams 和一般的 Keyed streams 是类似的。用户可以通过 RuntimeContext 访问 keyed state,也可以在数据流上执行窗口操作。利用 Local keyed streams,我们就可以在数据发送一端就进行本地的预聚合,统计一定时间段内 word 在当前 task 出现的次数。这些预聚合的结果然后被发送给下游,通过合并得到最终的结果。
但 Local keyed streams 的数据划分和分发和 keyed streams 不同。在 keyed streams 中,数据流会划分成多个 key group,每个 task 都会负责一部分 key group 的处理。每个 task 之间的 key group 是没有任何交集的。而由于 local keyed streams 是在 task 本地对数据流进行划分,因此每个 task 上的 key group range 都是 key group 全集。即如果数据流总共有 3 个 key group,那么每个 task 的 local key group range 都为 [1, 3]。
当并发度改变时,这些 local key group 将按照数据均匀分给新的 task。例如当 task 并发度从 3 变为 2 时,那么第一个 task 将分配到 5 个 local key group,而第二个 task 将被分配到 4 个。在这种情况下,同一个 task 将会被分配到多个具有相同 id 的 local key group。这些具有相同 id 的 local key group 将会被合并起来。当合并完成之后,所有 task 上的 local key group range 将仍然是 [1, 3]。对于 Reducing State, Aggregating State 以及 List State 来说,它们的合并是比较简单的。而对于 Value State, MapState 和 Folding State 等类型的数据而说,则需要用户提供自定义的合并函数来实现 local key group 的合并。
由于在一定时间段内发送给下游的数据量不过超过上游的并发度,下游的负载倾斜可以有效缓解。同时由于数据在上游一般没有较为严重的倾斜,程序性能不会由于负载倾斜而严重降低。我们测试了 WordCount 程序在不同数据倾斜程度下的吞吐。可以看到,在没有使用 local keyed streams 的情况下,程序性能随着倾斜程度而迅速下降,而使用 local keyed streams 之后,程序性能几乎不受影响。
为了方便用户开发画布和 SQL 程序,我们实现了超过 30 个的 Table API 和 SQL 函数。用户可以利用这些内置函数极大地提高实时计算应用的开发效率。此外,我们也对数据流和外部维表的 join 进行了大量优化,并补充了 Flink 还未支持的 Top N 功能。我们还提供了 incremental window 功能,允许用户能够在窗口未触发时得到窗口的当前结果。Incremental window 在多个应用场景中有着广泛的应用。例如用户可以利用 incremental window 统计活跃用户数目在一天内的增长情况。
我们后续的工作主要包括以下几个方面:
我们将继续研究提高 Flink 任务调度效率的方法。目前 Flink 使用单线程模型执行任务调度。在作业并发度较高的情况下,Flink 的任务调度效率较低。我们将尝试使用分布式和异步的任务调度模型来提高任务调度效率。
我们还将继续研究批流融合的 checkpoint 机制。目前 Flink 基于 chandy-lamport 算法来为流作业进行 checkpoint,而使用 upstream restart 的方式来将批作业从故障中恢复。我们可以将两者结合起来,提供一种统一的 checkpoint 机制,使得在流作业的恢复可以利用缓存的中间结果来减少所需重启的 task 数目,而在批作业中,通过对长时间运行的任务进行 checkpoint 来避免在发生故障时从头开始重新执行。
我们还将在制定执行计划时考虑数据在空间和时间维度上划分。例如在系统资源不足以支持对数据流式处理时,我们可以将数据在时间维度上进行划分,依次对划分好的数据进行处理。
腾讯大数据产品矩阵即将发布的 SuperSQL 项目,利用 Flink 的计算能力来满足跨数据中心,跨数据源的联合分析需求。它可以做到:数据源 SQL 下推,避免集群带宽资源浪费;单 DC 内 CBO(基于代价优化),生成最优的执行计划;跨 DC CBO,根据 DC 负载和资源选择最佳 DC 执行计算,从而获得更好的资源利用和更快的查询性能。
你也「在看」吗?👇