Uber 有一个复杂的“市场”,由乘客、司机、食客、餐厅等组成。在全球范围内运营该市场需要实时的情报和决策。例如,识别延迟的 Uber Eats 订单或放弃的购物车有助于我们的社区运营团队采取纠正措施。对于日常运营、事件分类和财务情报来说,拥有一个包含不同事件的实时仪表板是至关重要的,这些事件包括消费者需求、司机可用性或城市中发生的行程等等。
在过去的几年里,我们已经建立了一个自主服务平台来支持这样的用例,以及 Uber 不同部门的许多其他用例。该平台的核心构件是 Apache Pinot,这是一个分布式的在线分析处理(OnLine Analytical Processing,OLAP)系统,该系统用于对 TB 级数据执行低延迟的分析查询。在本文中,我们介绍了这一平台的细节,以及它如何融入 Uber 的生态系统。我们重点介绍了 Pinot 在 Uber 内部的演变,以及我们如何从少数用例扩展到多集群,全主动部署,为数百个用例提供支持,以毫秒级的延迟查询 TB 级规模的数据。
上图描述了实时分析用例的典型需求。Uber 内部的不同用例可以分为以下几个大类:
Uber 的许多工程团队使用 Ponot 为各自的产品构建定制的仪表板。Uber Eats Restaurant Manager(餐厅经理)就是其中的一个例子:
这个仪表板可以让餐厅老板从 Uber Eats 订单中获得有关客户满意度、热门菜单、销售和服务质量分析的信息。Pinot 支持以不同的方式对原始数据进行切片和分片,并支持低延迟查询,从而为餐厅老板带来丰富的体验。
类似地,我们的城市运营团队已经构建了定制的仪表板,利用 Pinot 的实时和历史数据相结合的能力,获取供需、异常事件(例如,最近五分钟内延迟的订单)、实时订单等方面的指标。这是我们日常运营的重要工具,有助于及早发现问题。
另一类用例源于作为许多后端服务的一部分执行分析查询的需求。这类用例的主要区别要求是数据的新鲜度和查询延迟,他们本质上需要是实时性的。例如,实时识别 Uber 乘客分组的地理热点对于良好的用户体验至关重要。同样,立即识别出司机取消或遗弃的 Uber Eats 购物车,可以快速采取纠正措施(以消息 / 奖励的形式)。
数据探索通常是在传统的批处理和仓库系统(如 Hadoop)上完成的。但是,有许多情况下,用户需要能够对实时数据执行复杂的 SQL 查询。例如,工程师经常需要通过加入微服务记录的各种事件来对事件进行分流。在其他情况下,实时时间可能需要与 Hive 中的批数据集连接。在 Uber 内部,我们在 Apache Pinot 之上提供了一个丰富的(Presto)SQL 接口,以开启对底层实时数据集上的探索。此外,该接口与我们所有的内部商业智能工具(如 Dashbuilder)无缝对接,这对我们所有的客户都非常有用。例如,下面是一张简单的 Sunburst 图表,显示了 Uber Eats 在五分钟内的订单明细,针对特定地区按工作状态分组。这是使用 Dashbuilder 通过在 Pinot 之上运行 Presto 查询在几秒钟之内构建的。
今天,生产中数百个关键业务用例由 Apache Pinot 提供支持。在过去的几年里,我们已经从一个 10 节点的小型集群发展到每个区域数百个节点。由 Pinot 管理的总数据占用空间已经从早期的几十 GB 增长到今天的几十 TB。同样地,每个区域的每秒查询量也增加了 30 倍(今天生产中的每秒查询量高达数千次)。
在下面的章节中,我们将详细介绍我们的平台,讨论 Uber 对 Apache Pinot 做出的独特贡献,并详细阐述在大规模运营该平台的过程中所学到的经验教训。
为了服务这样的用例,我们围绕 Apache Pinot 构建了一个自助服务平台,如下图所示:
该架构的不同组件可以分为三个阶段:
这也成为数据准备阶段,负责使数据可供 Pinot 使用。一般来说,Pinot 可从流数据源(例如 Apache Kafka)以及批处理 / 脱机数据源(例如 Apache Hadoop)中获取数据(请参阅 Pinot 文档)。在 Uber 内部中,我们添加了更多的功能,如下所述:
在某些情况下,我们需要对输入的 Kafaka 主题做一些额外的处理,然后 Pinot 才能处理数据。例如:将输入主题与另一个主题 / 表连接起来,或者对一些列值进行预聚合。
对于这种情况,我们依赖 FlinkSQL 的流处理平台(以前称为 Uber 的 AthenaX,后来回馈给 Apache Flink 社区)。它提供了一个 SQL 接口,用于表示对输入流(Kafaka)的富处理,该输入流被编译成 Apache Flink 作业,并在我们的 YARN 集群上执行。这样一个 FlinkSQL 作业的输出是另一个 Kafaka 主题,它成为 Pinot 的数据源。下面是一个简单的 FlinkSQL 作业示例,它根据设备操作系统和特定的城市 ID 过滤输入的记录。
这些经过处理的数据现在可以提取到 Pinot 中,以进一步进行切片和分片。
与实时数据源类似,脱机数据源可以按原样获取,也可以在提取到 Pinot 之前进行预处理。在 Uber 内部,我们依赖另一个名为 Piper(工作流调度系统)的平台获取脱机数据集。与 FlinkSQL 一样,Piper 作业允许用户指定一个 SQL 查询(在本例中为 Hive 查询),用于指定对原始数据所需的处理。在内部,它运行 Spark 作业来运行这个查询,从输出数据创建 Pinot 段(segment)并将其导入到 Pinot 中。如下图所示:
Piper 允许用户以给定的频率(例如,每小时或每天)安排这个作业,这反过来定义了将脱机数据集导入到 Pinot 的频率。
下面是 Apache Pinot 核心存储引擎的放大视图:
这是 Apache Pinot 以对称配置部署在两个不同地理区域的视图。如图所示,每个区域都有完全相同的组件:
每个 Pinot 集群由一个控制器(集群的大脑)、代理(查询处理节点)和服务器(数据节点)组成。Pinot 被设计为从头开始的多租户,它使我们能够将代理和服务器的特定组合分组到一个租户中:一个由特定用例拥有的隔离单元。例如,图中显示的是一个拥有两个租户的 Pinot 集群:Eats 和 Maps。在这个例子中,Maps 租户有两个代理和两个服务器,Maps 数据将均匀分布在这两台服务器上,查询处理将被限制在指定的代理中,从而将其与任何 Eats 的流量隔离开来。
在 Uber 内部,Pinot 表可以配置为:
除了核心 Pinot 存储,我们还利用了另外两个组件。
这是 Uber 使用的所有模式的集中存储库。在 Uber 内部,Pinot 大量使用这一点作为所有 Kafaka 模式的真相来源。我们添加了一个定制的 Pinot 解码器,用于在获取过程中获取所需的 Kafaka 模式,并生成相应的 Pinot GenericRow 对象,该对象反过来又用于段生成。接下来,我们还计划使用模式服务来管理 Pinot 模式。
Pinot 具有段存储的概念,用于对其不可变数据段进行归档。对于任何给定的实时或脱机的 Pinot 表,一旦数据段被密封(基于某些标准),它就变为不可变的。然后将该段存档到段存储中,以便在节点或复制失败期间进行恢复。最初的 Pinot 架构依赖于安装在 Pinot 节点上的符合 POSIX 的文件系统(比如 NFS)。通过添加使用任何通用存储系统(例如 HDFS、Ceph 或 S3)作为段存储的功能,我们对这一功能进行了扩展。有关更多详细信息,请参阅以下章节。
目前在 Uber 内部中有两种查询 Pinot 数据的方式。
Apache Pinot 有一个称为代理的组件,用于发出 REST 风格的查询。我们在代理上添加了轻量级层,称为 Pinot REST Proxy。这是一个简单的 Restlet 服务,为应用程序查询任何 Pinot 表提供了一种方便的方式。
正如前面提到的,每个 Pinot 表都有与一个租户相关联,该租户有一组唯一的代理。任何客户端应用程序都必须查询其中一个代理才能访问指定表。这就增加了一些复杂性,因为客户端应用程序需要知道其中的不同租户和代理。使用这个 Restlet 服务,客户端应用程序可以通过一些负载均衡器(在我们的例子中是 haproxy)到达任何一个 REST 代理节点。每个 Pinot REST 代理实例本地缓存 Pinot 路由信息(通过 Apache Helix 获得),它使用这些信息来标识租户、标识代理集,并以异步方式将客户端请求路由到其中一个租户。
每个 Pinot REST 代理实例中本地缓存的元数据在各种场景下都很有用。Piper(Spark)作业可以查询 REST 代理来获取表和模式信息,而不是 Pinot 控制器。这样可以减少控制器的负载,并将其与请求峰值隔离开来。
Pinot REST 代理目前正大量使用仪表板和分析应用程序的用例。
最近,我们在 Presto 和 Pinot 的集成方面做了很多工作,它允许我们的用户使用标准的 PrestoSQL 来查询 Pinot。我们最初将重点放在实时探索用例以及一些分析应用程序上。但是,经过多次优化和多个季度的生产经验之后,我们目前也在上马实时仪表板和应用程序用例。我们的长期计划是,用 Presto 取代 Pinot REST 代理。有关更多详细信息,请参阅下面关于完整的 SQL 支持的部分。
Uber 的 Pinot 团队在提高整体可靠性和查询灵活性方面有四大贡献。
在 Pinot 平台成立之初,加载新用例是一个非常手工的过程。我们的一位 Pinot 工程师必须与客户坐下来了解需求,提出一个模式、表配置,并估计服务该用例所需的容量。当然,客户开始使用 Pinot 的周转时间可能是三天到一周不等。为了让平台实现自助加载服务,我们在以下几个方面进行了投资。
模式推断
我们添加了从输入 Kafaka 主题或使用 Avro 模式创建的 Parquet 数据文件自动派生 Pinot 模式的能力。在较高级别上,这个实用程序将 Avro 字段转换为 Pinot 列类型,并自动选择其中一个字段作为 timestamp 列。在某些情况下,它还将 Avro 记录展平为各个 Pinot 列类型。这种自动转换适用于超过 80% 的 Kafaka 或 Parquet 输入数据集,节省了大量的手工操作。
FlinkSQL 推送
我们与 FlinkSQL 紧密集成,使客户能够将 Pinot 视为“数据接收器”。客户创建一个新的 FlinkSQL 作业,定义一个 SQL 转换查询,定义 Kafaka 主题的输入和输出,然后“push”(推送)到 Pinot。
在本例中,Pinot 模式是从输出 Kafaka 主题推断出来的。一旦 FlinkSQL 作业开始执行,这将在 Pinot 登台环境中自动创建一个表。
Piper 推送
类似地,我们添加了从“Hive to Avro Converter” Spark 作业的输出派生 Pinot 模式的功能,并在推送数据之前在模拟 Pinot 集群中自动创建表和模式。
每个此类表都是在一个具有最小配置的登台环境中创建的。这允许用户在几分钟内开始向 Pinot 发出查询,或者构建一个定制的 BI 仪表板。在登台阶段,表会经历几轮迭代,例如模式演变、向相应的列添加专门的索引(例如,星型树、排序或倒排)以及用户查询的验证。登台环境中的内存和磁盘使用情况可以很好地指示生产需求。我们要求每个用例在升级到生产环境之前至少要进行 24 小时的审查。
如前所述,我们已经将 Pinot 和 Presto 集成在一起,以实现对这些数据集的标准 PrestoSQL 查询。这种组合效果很好,因为我们将 Pinot 的秒级数据新鲜度与 Presto 在执行复杂查询时的灵活性结合起来。此外,谓词下推和聚合函数下推使我们能够实现此类 PrestoSQL 查询的亚秒级查询延迟,这在标准后端(如 HDFS/Hive)上是不可能做到的。请阅读我们之前发表的文章了解这项工作的详细信息:
https://eng.uber.com/engineering-sql-support-on-apache-pinot/
Pinot 最初的实时流获取 设计(又名 LLC)要求在 Pinot 控制器上安装一个本地文件系统来存储 Pinot 段。如果 Pinot 服务器已经落后于其副本或在节点故障后正在重建的情况下,它允许服务器从中央存储下载段。最初的设计通过在控制器上安装一个网络文件系统(NFS)来解决这一容量问题。
与 Pinot 的许多其他用户一样,Uber 并没有 NFS,因此不能使用原始的 LLC 设计。为解决这一问题,我们和 LinkedIn 的工程师一起增强了 LLC 协议的分段完成阶段,使其可以与深度存储或 HDFS 或 Amazon S3 之类的外部存储服务一起使用。如今,Uber 的 Pinot 实时获取管道使用 HDFS 作为其深度存储,其中有数百个来自 Pinot 的段。
我们的团队发现了 Pinot 模式演变中的一个重要问题。对于实时 Pinot 表(具体地说,从流数据源获取),并不完全支持向现有模式添加新列。尽管较旧的数据段准确地反映了这一点,但新列在最活跃的数据段中并不可见,从而导致查询失败。我们对这一关键问题的解决方法,可以在这里找到:
https://github.com/apache/incubator-pinot/issues/4225
当我们在 Uber 内部扩展 Pinot 的用例时,我们学到了很多东西。本节中的许多经验教训来自于解决在操作、部署、内存管理和监控方面遇到的难题。
易操作性对于扩大 Pinot 的使用范围至关重要。当更多的用例加入到 Pinot 集群时,我们希望将集群管理的开销降到最低限度。谢天谢地,Pinot 有几个现成的功能,使操作和管理变得更容易。
大规模运行分析系统的核心需求之一是多租户。Pinot 提供了原生的 多租户 支持,并且在操作中,特别是在减少停机方面显示了巨大的价值。借助一流租户的支持,这些表可以在裸机上组合在一个单一租户名下,并分配给该租户的主机。这提供了强大的隔离性,并避免了嘈杂的邻居问题。当出现问题时,如对服务器进行错误的查询时,我们可以限制对其租户的影响,而不会违反其他租户的 SLA。
Pinot 提供了段分配策略,因此段可以在主机之间均匀分配,包括新添加的主机。这大大简化了集群 / 租户扩展工作。我们所需要做的就是提供一个新的 Pinot 服务器主机,并将其添加到所需的租户。它将自动开始获得新的段。
Uber 在开源 Pinot 基础上改进的一个方面是增加了与租户相关的 JMX 指标。在 Uber 这样的规模上,我们的 Pinot 集群有用大约 1000 张表,数百台 Pinot 服务器或代理。这使得 Pinot 管理员很难进行监控;一个典型的场景是,一个表有查询性能问题,但这可能是由同一租户上的其他一些表引起的。使用 Pinot 租户指标,可以将服务器分组为十几个租户,以检查每个租户的资源使用情况和查询性能。它使 Pinot 服务器 / 代理和问题分类变得更加容易。
Pinot 提供的另一个有价值的特性是段存储(这里提到过),如远程 HDFS 集群或云存储。这一特性大大减少了更换服务器节点所需的操作工作量,有助于处理大型计算机池中发生的硬件故障。通过深度存储备份,新增加的主机能够在没有人工干预的情况下即可下载和恢复数据,并在段完全下载后自动为流量提供服务。
通常,给定的 Pinot 控制器或服务器可能会经历垃圾回收(Garbage Collection,GC)暂停。根据垃圾回收暂停的严重程度,这可能会导致性能下降或宕机。例如,如果引导者控制器持续经历完整的垃圾回收暂停,它就无法创建新段,从而停止对该集群中的所有表的提取。如果 Pinot 服务器经历了完整的垃圾回收,它可能会导致查询延迟峰值和属于该节点的所有表中的查询结果不一致。以下是这些问题背后的典型原因。
在 Pinot 之上启用 Presto 查询后,我们注意到,错误的查询通常会使 Pinot 服务器不堪重负。例如,如果用户试图在很长的时间范围内(或没有任何时间范围谓词),这将导致 Pinot 服务器耗尽大量内存(我们的设置使用内存中的索引而不是 mmap),最终导致垃圾回收完全暂停。有几种方法可以将此类大型赛秒的影响降至最低:
通常,为了避免这类问题,我们将这种特殊查询用例分离到一个单独的租户。
Pinot 控制器是整个 Pinot 集群的控制平面。按照设计,它需要有限的堆空间,因为它不承载或提供数据。当我们的 Pinot 集群变得越来越大,有了更多的表或更多的段时,我们发现,Pinot 控制器的堆使用量大大超过了默认值 4GB。随后发生了主要的 Java 垃圾回收事件,这对整个集群造成了破坏。通过对 Pinot 控制器的堆分析,我们发现这个问题与 Pinot 控制器中使用的 Helix 库保持的直方图指标有关。直方图指标使用默认的 1 小时滑动窗口,这意味着对于繁忙的生产控制器来说,内存中将保留太多的事件数据点。我们对 Apache Helix 提供了一个补丁,以便可以配置滑动窗口长度来减少内存占用。
随着各个 Uber 团队使用各种各样的 Pinot 索引和查询模式,我们还看到了内存开销的其他情况:
由于 Pinot 集群的规模和整体数据量,到目前为止,我们在段管理方面遇到了一些挑战。
随着数据规模的不断扩大,我们也经历了由于段过多而导致的一些问题。用于集群管理的 Pinot 利用 Apache Helix 而不是 Apache ZooKeeper。例如,当服务器从脱机转换为在线时,Pinot 将通过 Helix 传播状态转换消息以通知其他实例。这种状态转换消息的数量与服务器上的段数量成正比。当一个服务器承载太多的段时,Helix 上可能会出现一个状态转换消息的尖峰,从而导致大量的 Zookeeper 节点。如果 Zookeeper 阶段的数量超过缓冲区阈值,Pinot 服务器和控制器将会崩溃。为了解决这一问题,我们在 Pinot 控制器中添加了 消息节流,以消除状态转换浪涌。
我们面临的另一个挑战是,由于段分配策略对热点的潜在影响。默认情况下,Pinot 通过为分配最少的主机分配一个新段来平衡服务器之间的段。因此,在集群扩展的情况下,可以在新添加的服务器中创建最近的段。为环节这一问题,我们在集群扩展后运行表以重新平衡。
在段深度存储的操作过程中,我们发现当前的 LLC 协议存在两个主要问题:
第一个问题特别严重,因为我们的许多用户希望在数据先限度方面有较高的 SLA(对于第 99 个百分位数,少于 5 分钟)。在 Uber 内部,我们已经见过一些情况,HDFS 由于维护或宕机而无法使用长达一个小时左右的情况。这违反了我们所有重要实时表的 SLA。实际上,HDFS 有自己的 SLA,并且可以独立于 Pinot 发生故障。为解决这个严格的依赖关系问题,我们提出了对 LLC 的 重大改进,以便即使深度存储停机长达几个小时,它也可以继续实时获取。在深度存储停机期间,该方案利用对等服务器存储来下载段。该方案已经得到了社区的批准,代码已经完成,目前正在测试中。
总体而言,我们在使用 Apache Pinot 方面的经验非常棒。在 Uber 内部,它已经成为解决大规模实时分析用例的关键技术。高效的内存索引和列压缩有助于降低存储成本。内置的多租户特性以及节点和租户易于维护,运行成本低。此外,围绕着 Pinot 的 Apache 社区非常热情,参与度很高。我们将继续投资 Pinot,并计划在未来的项目中与社区合作,如 Pinot Upserts、联合段存储和查询、智能索引等。
如果你有兴趣了解更多关于 Apache Pinot 的信息,请访问以下资源:
作者介绍:
Yupeng Fu,Uber 数据团队软件工程师,领导多个流媒体团队构建可扩展。可靠和高性能的流媒体解决方案。他是 Apache Pinot 的贡献者。
Girish Baliga,在 Uber 管理 Pinot、Flink 和 Presto 团队。目前正在帮助团队构建基于 Pinot 的全面自助实时分析平台,为关键业务的外部仪表板和指标提供支持。他是 Presto Linux 基金会管理委员会的主席。
Ting Chen,Uber 数据团队软件工程师,流分析团队的技术主管,其任务是为 Uber 产品和客户端提供快速、可靠的实时见解。他是 Apache Pinot 的贡献者。
Chinmay Soman,曾是 Uber 数据团队软件工程师。他曾领导流媒体平台团队,其任务是为 Uber 的所有消息传递、流媒体处理和 OLAP 需求构建一个可扩展的平台。他是 Apache Pinot 的贡献者。
原文链接:
https://eng.uber.com/operating-apache-pinot/