斯文骏: 我们最初开发 PyODPS 的目的是为了方便数据开发者使用 MaxCompute,其出发点和 Mars 是一致的,都是能让数据开发者使用与 Python 通用 Library(Numpy、Scipy、Pandas)类似的用法来使用 MaxCompute。但在开发和推广 PyODPS 的过程中,我们发现 PyODPS 存在不少局限性。PyODPS 仅仅在用户端执行,其工作原理是将用户表达式编译为 SQL,因而对于矩阵运算和 Pandas 中的 Index 变换等操作存在实现困难。即便上述操作可以使用 SQL 编写,SQL Engine 对其执行效率也难以保证。因此,我们决定从更底层出发,开发一套兼容 Python 生态要求的分布式 Library,且支持在 MaxCompute 中运行,这便是 Mars。
相比 PyODPS 只支持 DataFrame,且 DataFrame 与 Pandas DataFrame 存在一定的差异。Mars 直接依照 Numpy、Scipy 和 Pandas 接口编写,因而更符合 Python 开发者的习惯。
斯文骏:Numpy、Scipy 等计算库能够非常高效地在单机上进行数值计算,这也是 Python 为什么能够在数据分析领域迅速增长的重要原因。但随着数据规模的不断扩大,现有单机库逐渐难以应对在如此规模的数据中进行数据分析的需求。而分布式计算库需要在传统科学计算库之上加上分布式计算引擎的能力,具体包括:
根据计算的特点对数据进行拆分。例如,矩阵乘法和 SVD 在分布式计算的场景中,对于数据切分的要求不同。进行数据拆分和物理执行图的建立都是分布式计算库的职责;
执行过程中,执行图常常深度较大,且 Barrier 较少,调度器需要利用这一特性以减少中途的 IO 和等待代价;
分布式计算引擎共有的要求,例如计算、IO 的高效并行化,故障恢复,等等。
斯文骏:Mars 需要实现的是细粒度图调度和执行。用户使用 Mars 类型编写代码后,将形成一张逻辑上的粗粒度图。该 Graph 在提交到 Scheduler 后,会形成一张细粒度图,此后细粒度图的节点将被分配到各个 Worker 中执行。
我们基于自己实现的 Actor Model 搭建分布式调度服务。Actor Model 使用 Gevent 实现异步操作,并包装所有通信,包括进程间和机器间的调用。每个 Mars 服务都由多个进程组成,以减少 Python GIL 对执行效率的影响。为减少大块数据在 Worker 内各进程间的复制成本,Mars 使用 Arrow 进行序列化,并使用 Plasma Store 在各进程间共享所需的数据。当 Plasma Store 充满时,会 Spill 一部分旧数据到磁盘以使计算能继续进行下去。
Mars 目前支持 Worker 进程以及 Worker 两级 Failover。当 Worker 进程 Fail,Worker Daemon 会重建进程,并重跑受影响的节点。当整个 Worker Fail,Scheduler 会收到通知,根据丢失的 Worker 以及数据血缘关系确定需要重跑的节点,重新分配初始节点并将相关节点再次提交执行。
斯文骏:Numpy 是非常强大的 Python 数值计算库,也是 Python 数值计算的事实标准。Mars 对 Numpy 的兼容并不意味着要替换 Numpy,而是在大规模分布式计算的情景下为用户提供科学计算的能力,这一能力在单机上的执行仍然需要依赖 Numpy、Scipy、Pandas 或者 API 类似 Numpy、Pandas 的 Cupy、Cudf 等单机库。当然,Numpy 并不是在所有情形下执行效率都是最优的,所以会有 Numexpr/Numba/Jax 等在 Numpy 基础上实现加速的 Library,而使用 Mars 在某些场合下也能达到这样的效果。
斯文骏:Mars 和 Dask 都是使用 Python 编写的以离线数据分析为目标的分布式并行计算库,且都拥有和 Numpy/Pandas 相近的 API。但 Mars 和 Dask 在设计思路上有明显的差异。
表达式和计算图方面,Mars 构建了一整套 Tensor/DataFrame 表达方式,采用 Protobuf/JSON 记录计算图,这使得 Mars 可以使用非 Python 客户端提交作业,也可以方便地采用 Numexpr/Cupy 等库对不同运算符采用个性化的优化。而 Dask 则采用 pickle 序列化 Python Dict 及 Python Function 的形式,对客户端 / 服务端的 Python 及相关 Library 的版本一致性要求很高,同时优化难度也较高。
分布式框架方面,Mars 实现了自己的轻量化 Actor 模型,并在此基础上搭建 Scheduler 和 Worker。Mars 支持多个 Scheduler 以降低单点负载,同时通过多进程减少 Python GIL 的影响。而 Dask 则使用线程模型,单机调度效率较低。目前 Benchmark 的结果,Mars 单机执行效率全面高过 Dask,分布式执行中的大规模矩阵乘法等作业执行效率可达到 Dask 的 3 倍以上。同时,Mars 还支持进程和 Worker 级别的 Failover,使作业执行更加可靠。
Ray 主要是为增强学习开发,提供了一套非常灵活的高效率分布式执行框架,但其并未直接提供分布式 DataFrame 等支持,也没有提供图优化、调度策略以及磁盘 Spill 等支持,用户上手会比较困难。现有基于 Ray 的 Modin 等库虽然提供了 DataFrame 功能,但也存在计算规模较低等问题。
斯文骏:Spark 和 Mars 在调度方式上是有显著差异的。Spark 的 DAG 是一种粗粒度图,两个节点内的各 Partition 根据 Narrow 或者 Wide Dependency 建立一对一的连接或者全连接。Spark 根据 Wide Dependency 切分 DAG 为 Stage,每个 Stage 需要依次执行。而 Mars 的 DAG 则是细粒度图,每个 Chunk 可以有自己的依赖。这就意味着 Mars 在执行过程中,可以更精细地控制每个 Chunk 的执行和数据释放,从而获得更大的并行度和更高的效率。
在很多场景中,全连接事实上是不必要的。 以矩阵乘法为例,分布式矩阵乘法可分为 Chunk 相乘阶段和乘积累加阶段。在 Chunk 相乘阶段,Tensor a 中的每个 Chunk 并不是与 Tensor b 中的所有 Chunk 相乘,其所需的连接个数仅与 Tensor b 的某个维度的大小相同。因而,在执行中,无需等待所有 Chunk 乘法执行完成,即可执行后续的加法,待加法完成后,加法所依赖的乘积即可被释放,用于其他计算。因而 Mars 所采用的细粒度调度可以有效减少 IO 和 Stage 等待的开销,从而拥有更高的执行效率。
斯文骏:Mars 和 TensorFlow/PyTorch/MXNet 等机器学习框架的出发点不同。Mars 的出发点在于解决数据分析的规模问题。TensorFlow 等机器学习框架的关注点不在规模,而在如何方便地实现机器学习 / 深度学习算法。 其提供的分布式功能需要用户自行指定各个节点的职能。为构建生态,Mars 正在考虑引入上述框架,使用户可以使用 Mars 产生的数据进行后续的机器学习,并将结果回流到 Mars。
JAX 是一套 Numpy 加速库,Mars 正在尝试引入以提高执行速度。
斯文骏:Mars 可以独立部署执行,也可以使用 PyODPS 通过 MaxCompute 调度执行,从 MaxCompute 表中读取数据,执行结果回流到表中。目前 MaxCompute 中运行 Mars 处于内测阶段。
斯文骏: 自从开源以来,Mars 已经从 0.1 演进到 0.2,目前主干代码为 0.3。自开源以来的主要更改包括:
斯文骏:Mars 目前所有开发、CI 和 Code Review 都在 Github 上进行,目前主要是 5 位阿里内部开发者参与 Mars 的开发。近一年来我们工作的重心主要在 Mars 本身的完善而非推广,阿里内部和外部的使用主要是试用性质。近期我们会加大推广的力度。
斯文骏: 技术上,Mars 未来会继续增强对 Numpy/Pandas API 的支持,进一步提高执行效率,同时建立接口以方便与其他数据开发 / 机器学习工具交换数据。Mars 也会加强社区建设,近期将提供一系列代码解析文章,并通过建立开发者讨论组等形式加强开发者之间的联络。
采访嘉宾: 斯文骏,阿里巴巴计算平台事业部技术专家,硕士研究生毕业后加入阿里巴巴,先后参与机器学习平台和 PyODPS 开发,2017 年参与启动 Mars 开发,为 Mars 分布式引擎主要开发者之一。
在 QCon 上海 2019 的演讲中,斯文骏老师将基于 Mars 以往的实践介绍系统架构和提升执行效率方面的实践。更多精彩案例请关注 QCon 上海 2019 ,大会日程现已上线,点击 「 阅读原文 」了解详情。目前大会 9 折报名中 ,现在报名立减 880 元,团购可享更多优惠!有任何问题欢迎联系票务小姐姐 Ring ,电话:13269076283 微信:qcon-0410。
点击下方图片即可阅读
10个不为人知的SQL技巧
你也「在看」吗?👇