对于许多数据科学家来说,构建和优化机器学习模型的过程只是他们每天工作的一小部分。他们的绝大多数时间都花在执行 ETL、构建数据管道和将模型投入生产等不那么光鲜(但至关重要)的工作上。
在本文中,我们将逐步介绍构建生产环境数据科学管道的过程。在此过程中,我们将演示 Delta Lake 如何成为机器学习生命周期的理想平台,因为它提供了统一数据科学、数据工程和生产工作流的工具和特性,包括:
能够连续处理来自历史和实时流源的新数据流的表,极大地简化了数据科学生产管道。
模式执行,确保表保持干净整洁,不受列污染(column contamination),并为机器学习做好准备。
模式演化,它允许向现有数据表添加新列,即使生产环境正在使用这些表,也不会导致破坏性更改。
时间旅行,也就是数据版本控制,允许对任何 Delta Lake 表的更改进行审计、再现,甚至在由于用户错误而发生意外更改时,还可以根据需要回滚这些更改。
与 MLflow 集成,通过自动记录实验参数、结果、模型和图表来跟踪和再现实验。
Delta Lake 的这些特性使数据工程师和科学家能够比以往更快地设计出可靠、有弹性的自动化数据管道和机器学习模型。
一种常见的架构是,使用对应于数据工程管道中不同质量级别的表,逐步向数据添加结构:数据摄取(“Bronze”表)、转换 / 特征工程(“Silver”表)和机器学习训练或预测(“Gold”表)。把这些表组合在一起,我们称之为“多跳”架构。它允许数据工程师构建一个管道,以原始数据作为“单一的真理来源”,所有的东西都从原始数据开始流动。后续的 转换 和聚合可以重新计算和验证,以确保业务级聚合表仍然反映底层数据,即使下游用户对数据进行加工并引入特定于上下文的结构。
为了理解 Delta Lake 管道是如何工作的,我们有必要更深入地研究下数据与水的类比(如果您允许我们使用扩展的示例)。Delta Lake 不需要调度一系列不同的批处理作业来使数据分阶段地通过管道,而是允许数据像水一样流动:无缝地、持续地、实时地。
Bronze 表是典型的湖泊,大量的水(数据)源源不断地流入。当它到达时,它是脏的,因为它来自不同的来源,其中一些不那么干净。从那里,数据源源不断地流入 Silver 表,就像一条与湖泊相连的小溪的源头,快速不断地流动。当水(在我们的例子中是数据)顺流而下时,迂回曲折的河流会对它进行净化和过滤,它在流动的过程中会变得更加纯净。当它到达下游的水处理厂(我们的 Gold 表)时,它会接受一些最终的净化和严格的检测,以使其可以被消费,因为消费者(在本例中是 ML 算法)非常挑剔,不会容忍受污染的水。最后,它从净化工厂通过管道进入每个下游消费者的水龙头(无论是 ML 算法,还是 BI 分析师),准备好以最纯净的形式供消费。
为机器学习准备数据的第一步是创建一个 Bronze 表,在这里可以以最原始的形式捕获和保存数据。让我们看看如何做到这一点——但首先,让我们讨论一下为什么 Delta Lake 是 数据湖 的首选。
目前,我们看到的最常见的模式是,公司使用 Azure Event Hubs 或 AWS Kinesis 收集实时流数据(比如客户在网站上的点击行为),并将其保存到廉价、充裕的云存储中,比如 Blob 存储或 S3 存储桶。通常,公司希望使用历史数据(比如客户过去的购买历史)来补充实时流数据,以获得过去和现在的完整图景。
因此,公司往往会从各种来源收集到大量原始的、非结构化的数据,而这些数据却停滞在数据湖中。如果无法将历史数据与实时流数据可靠地结合起来,并为数据添加结构,使其能够被输入机器学习模型,这些数据湖很快就会变得错综复杂、杂乱无章,这就是“数据沼泽”一词的由来。
在转换或分析单个数据点之前,数据工程师已经遇到了他们的第一个难题:如何将(“批”)历史数据的处理和实时流数据结合起来。通常,可以使用 lambda 架构 来弥补这一差距,但这本身就存在一些问题,这些问题源于 lambda 的复杂性,以及它导致数据丢失或损坏的倾向。
解决“数据湖困境”的办法是利用 Delta Lake。Delta Lake 是一个位于数据湖之上的开源存储层。它是为分布式计算而构建的,百分之百兼容 Apache Spark,因此,你很容易转换现有的数据表,不管它们以什么格式存储(如 CSV、Parquet 等),并使用你喜欢的 Spark API 将它们保存成 Delta Lake 格式的 Bronze 表,如下所示。
注意,在本文中,我们将分析 Lending Club 提供的数据集,可以从这里下载:
https://www.kaggle.com/wendykan/lending-club-loan-data
# Read loanstats_2012_2017.parquet
loan_stats_ce = spark.read.parquet(PARQUET_FILE_PATH)
# Save table as Delta Lake
loan_stats_ce.write.format("delta").mode("overwrite").save(DELTALAKE_FILE_PATH)
# Re-read as Delta Lake
loan_stats = spark.read.format("delta").load(DELTALAKE_FILE_PATH)
一旦你创建了一个存储原始数据的 Bronze 表,并将现有的表转换为 Delta Lake 格式,你已经解决了数据工程师的第一个困境:结合过去和现在的数据。如何解决?Delta Lake 表可以无缝地处理来自历史和实时流源的连续数据流。而且,由于它使用 Spark,所以它近乎兼容所有不同的流数据输入格式和源系统,比如 Kafka、Kinesis、Cassandra 等。
为了说明 Delta Lake 表可以同时处理批数据和流数据,请看下面的代码。这段代码从文件夹 DELTALAKE_FILE_PATH 将初始数据集加载到 Delta Lake 表中(如上文代码块所示),在将新数据流入表之前,我们可以使用 SQL 友好的语法在当前的数据上运行一个批处理查询。
%sql
SELECT addr_state, SUM(`count`) AS loans
FROM loan_by_state_delta
GROUP BY addr_state
如上所见,最初,加州和德州的贷款数最高。
现在,我们已经演示了 Delta Lake 运行批数据查询的能力,下一步我们将展示其同时在流数据上运行查询的能力。
我们将创建一个流数据源,不断将新数据添加到 Delta Lake 表中,并混合我们前面绘制过的已有的批数据。注意,和之前的批查询代码块一样,loan_by_state_readStream 从同一位置(即 DELTALAKE_FILE_PATH 文件夹)读取。
loan_by_state_readStream = spark.readStream.format("delta").load(DELTALAKE_FILE_PATH)
loan_by_state_readStream.createOrReplaceTempView("loan_by_state_readStream")
实际上,批数据和流数据可以在同一位置(例如 DELTALAKE_FILE_PATH),而 Delta Lake 可以同时响应两种类型数据的查询,因此才有一个说法,Delta Lake 表提供提供了“统一的批数据和流数据源以及数据消费(sink)。”
在 Delta Lake 处理流时,可视化更新就在我们眼前,我们开始看到一个不同的模式出现。
如你所见,最近的数据流导致爱荷华州(中西部的州颜色越来越深)的贷款最多。即使使用 loan_by_state_readStream 将新数据并发地流进表中,loan_by_state_delta 表也会被更新。
既然我们已经看到了 Delta Lake 允许我们同时对批量数据和流媒体来源的数据进行可靠地分析,下一步是做一些数据清洗、转换和特征工程,为机器学习工作做好准备。
到目前为止,我们已经成功地将我们的数据转换为 Delta Lake 的格式,并创建了一个 Bronze 表作为无缝处理历史数据和实时数据的着陆区。目前,数据已就位,但当前的形式还远远谈不上有用:在可以用于机器学习模型之前,它需要大量的清洗、转换和结构化。ML 建模库没有提供(如果有的话)与数据类型、空值和缺失数据相关的灵活性,所以数据工程师接下来的工作是清理和处理原始数据。由于 Delta Lake 百分之百兼容 Apache Spark,我们可以在我们的 Delta Lake 表上使用熟悉的 Spark API 对核心内容进行数据再加工,如下所示。
print("Map multiple levels into one factor level for verification_status...")
loan_stats = loan_stats.withColumn('verification_status', trim(regexp_replace(loan_stats.verification_status, 'Source Verified', 'Verified')))
print("Calculate the total amount of money earned or lost per loan...")
loan_stats = loan_stats.withColumn('net', round( loan_stats.total_pymnt - loan_stats.loan_amnt, 2))
在执行完 ETL 之后,我们可以将清洗、处理过的数据保存到一个新的 Delta Lake Silver 表,这使得我们无需修改原始数据,就可以将结果保存为一个新表。
中间的 Silver 表很重要,因为它可能作为下游多个 Gold 表的数据来源,由不同的业务单位和用户控制。例如,你可以想象一下,一个代表“产品销售”的 Silver 表流入有几种不同用途的 Gold 表,比如,更新供应链仪表板、计算销售人员的工资奖金或为董事会成员提供高层次的 KPI。
我们不直接将 Gold 表与 Bronze 表中的原始数据连接起来的原因是,这会导致大量重复的工作。这将要求每个业务单元对其数据执行相同的 ETL。相反,我们可以仅执行一次。还有一个附带的好处,这一步可以避免由于数据分流而导致的混乱,像不同的业务单位计算相同的指标但又略有不同。
按照这个模型,我们就可以保证,保存或流入最后的 Gold 表的数据是干净、合规且一致的。
现在,我们已经转换了我们的数据,下一步是通过模式执行把结构引入我们的 Delta Lake Silver 表。模式执行(Schema enforcement)是数据科学家和工程师的一个重要特点,因为它能确保我们能够保持表的整洁。没有模式执行,单个列中可能会有不同的数据类型混在一起,对我们的数据可靠性造成了损害。例如,如果我们不小心把 StringType 类型的数据引入了一个 FloatType 数据类型的列,我们可能会无意中使我们的机器学习模型无法读取列,破坏我们宝贵的数据管道。
Delta Lake 提供了写入模式验证,这意味着 Delta Lake 会在将新记录写入一个表时进行检查,以确保这些记录匹配表上预定义的模式。如果记录不匹配表的模式,Delta Lake 将会引发一个异常,防止不匹配的数据污染数据类型存在冲突的列。这个方法比读取模式验证更好,因为一旦列已经被不正确的数据类型所污染,就很难再“把魔鬼重新放回瓶子里”。
Delta Lake 使得定义模式很容易,使用下面的代码执行模式。注意传入的数据如何被拒绝,因为它们与表的模式不匹配。
# Generate sample loans with dollar amounts
loans = sql("select addr_state, cast(rand(10)*count as bigint) as count, cast(rand(10) * 10000 * count as double) as amount from loan_by_state_delta")
display(loans)
# Let's write this data out to our Delta table
loans.write.format("delta").mode("append").save(DELTALAKE_SILVER_PATH)
// AnalysisException: A schema mismatch detected when writing to the Delta table.
如果错误不是由于一个列包含了错误的数据类型所导致,但是因为我们(故意)添加了一个没有在当前模式中体现的新列,我们可以添加列,然后使用模式演化纠正这个错误,我们稍后会解释。
一旦数据已经通过模式执行达到这个阶段,我们可以将其以最终形式保存在 Delta Lake Gold 表中。现在,数据已经经过彻底地清洗、转换,并且已经准备好供我们的机器学习模型使用——它们对数据结构非常挑剔!在将数据从原始状态流入 Bronze 表和 Silver 表的过程中,我们已经建立了一个可再现的数据科学管道,它可以使所有获取到的新数据进入 ML 就绪状态。这些流可以是低延迟或手动触发的,消除了传统管道所需的调度和作业管理。
现在,我们已经转换了我们的数据,并通过模式执行添加了结构,我们已经准备好开始运行试验,并使用我们的数据建立模型。这就是数据科学中的“科学”真正发挥作用的地方。我们创建零和替代假说,构建和测试模型,衡量我们的模型对因变量的预测有多好。事实上,这个阶段就是我们中的许多人闪光的时候!
数据科学家需要能够进行可再现的实验。再现性是所有科学探究的基础:如果观察结果不能测试、重复测试和复制,它是不可能进一步接近真相的。然而,当有这么多不同的方式可以处理相同的问题时,我们中有谁是严格线性推进的?
毫无疑问,我们中有很多人认为,我们处理事情的方式有点“神奇”,我们到达目的地的方式是沿着不确定和迂回的路线进行调查和探索。这没问题——只要我们使用的工具允许我们展示我们的工作、追溯步骤、留下面包屑——向缺乏仔细思考的想法中添加点科学的方法,如果你愿意的话。借助 Delta Lake 的时间旅行和 MLflow,上述一切皆成为可能,你甚至可以做更多。
对于数据科学家,Delta Lake 最有用的功能之一是能够使用数据版本控制回到过去,或者说“时间旅行”。Delta Lake 按顺序维护着在任何 Delta Lake 表上执行的每个操作的日志,所以如果你可以根据需要恢复到早期版本的表,撤销一项意想不到的操作,或者只是看看你的数据在特定时期的情况。
使用时间旅行从表的早期版本中选择数据很容易。用户可以查看表的历史,使用一个版本历史号(如以下代码所示,当选择表 loan_by_state_delta VERSION AS OF 0)或时间戳查看数据在那个时间点的状态。
%sql
DESCRIBE HISTORY loan_by_state_delta
要选择表的以前版本,你可以使用熟悉的 SQL 语法,如下所示:
%sql
SELECT * FROM loan_by_state_delta VERSION AS OF 0
除了使用表的版本号之外,你还可以使用一个时间戳获取数据快照,看看数据在一个特定的时间点是什么样子。
%sql
SELECT * FROM loan_by_state_delta TIMESTAMP AS OF '2019-07-14 16:30:00'
搭配 MLflow(下面讨论),Delta Lake 的时间旅行可以确保你执行的所有转换和试验都是可追踪、可再现、可逆的。它可以用来:
重新创建数据集或表在特定时间点的状态(创建数据“快照”)。
重建和验证训练和测试数据集,再现试验。
回滚表中任何意外的更改或转换。
顺序事务日志创建了一个可查证的数据血统,这对 GRC(治理、风险和合规)应用程序特别有用。对于像 GDPR 和 CCPA 这样的监管法规,公司需要有能力证明数据被正确删除或匿名化(集体或个人层面)。更新、合并、删除、插入等都可以出于审计目的来确认和验证。
最后,在得知像无意中删除行或算错列等这样的人为错误百分之百可以使用时间旅行撤销时,数据工程师就更容易入眠了。著名的墨菲定律指出,如果任何事情有可能出错,它就会出错,数据管道也不例外——由于人为错误,错误不可避免会出现。与硬件故障相比,丢失数据的情况更可能因为有人无意中编辑了一个表而发生,而这些错误是可以撤销的。
事务日志提供帮助的另一种方式是在调试错误时,你看——你可以回到过去,找出一个问题是如何产生的,并修复问题或还原数据集。
MLflow 是一个与 Delta Lake 搭配使用的开源 Python 库,使数据科学家能够毫不费力地记录和跟踪指标、参数、文件和镜像工件。用户可以运行多个不同的试验,根据需要修改变量和参数,并且知道,输入和输出都已经写入日志记录下来。你甚至可以自动保存训练模型,因为你要尝试不同的超参组合,这样,一旦你选择出表现最好的模型,模型权重已经保存并准备好供我们使用了。
在 Databricks,MLflow 自动启用 MLR 5.5,你可以使用 MLflow Runs Sidebar 查看 MLflow 运行,如下所示。
数据工程师和科学家经常会发现,从开始构建数据管道比维护容易。由于不断变化的业务需求、业务定义、产品更新和时间序列数据的性质,表模式不可避免地会随着时间的推移不断变化,所以重要的是要使用的工具使这些变化更容易管理。Delta Lake 提供的工具不仅可用于模式执行,也可以通过 mergeSchema 选项用于模式演化,如下所示。
# Add the mergeSchema option
loans.write.option("mergeSchema","true").format("delta").mode("append").save(DELTALAKE_SILVER_PATH)
%sql
-- Review current loans within the `loan_by_state_delta` Delta Lake table
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10
通过在查询中添加.option (“mergeSchema”、“true”),任何出现在 DataFrame 中而不在 Delta Lake 目标表中的列都会自动作为写入事务的一部分添加。数据工程师和科学家可以使用这个选项向现有的机器学习生产表中添加新列(也许是一个新的指标跟踪或本月的销售金额),而不破坏现有的依赖于旧列的模型。
MLflow 已在后台记录我们的参数和结果,我们准备将我们的数据分为训练集和测试集,并训练我们的机器学习模型。我们已经从 Silver 表中获取经过转换的数据,并执行模式以确保所有进入最终表的数据合规、无误,从而创建了 Gold 表,我们将在其上训练我们的模型。现在,我们已经使用我们之前介绍的“多跳”架构建立好了管道,使新数据不断流入我们的管道,然后加工和保存在中间表里。
为了完善机器学习生命周期,我们将使用下面代码片段中的标准和交叉验证建立一个 GLM 模型网格。我们这里的目标是预测借款者是否会在给定的贷款上违约。
在这里查看完整代码:
https://pages.databricks.com/rs/094-YMS-629/images/02-Delta%20Lake%20Workshop%20-%20Including%20ML.html
# Use logistic regression
lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "scaledFeatures")
# Build our ML pipeline
pipeline = Pipeline(stages=model_matrix_stages+[scaler]+[lr])
# Build the parameter grid for model tuning
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
# Execute CrossValidator for model tuning
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=5)
# Train the tuned model and establish our best model
cvModel = crossval.fit(train)
glm_model = cvModel.bestModel
# Return ROC
lr_summary = glm_model.stages[len(glm_model.stages)-1].summary
display(lr_summary.roc)
得到的受试者工作特征(ROC)曲线如下图所示。
然后,我们将这个模型和完整代码笔记本(这里)中的其他几个广义线性模型做比较。在选出最好的模型(一个 XGBoost 模型)后,我们用它来预测我们的测试集,然后基于每个正确或错误分类绘制出我们节省或损失的钱数。在数据科学家看来,像这样以确凿的美元和美分表示你的分析总是一个好主意,因为它让你的结果具体而容易理解。
display(glm_valid.groupBy("label", "prediction").agg((sum(col("net"))).alias("sum_net")))
这里 有一篇更深入的博客文章,其中的示例使用了 Scala。
Delta Lake 非常适合于机器学习生命周期,因为它提供了一些特性,统一了数据科学、数据工程和生产工作流。它实现了从原始数据到结构化数据的连续数据流,允许在新输入的数据上训练新的 ML 模型,而现有的生产模型仍在提供预测服务。它提供了模式执行(确保数据格式正确,可以为数据机器学习模型所处理)和模式演化(防止模式变化破坏现有的生产模型)。最后,Delta Lake 提供的“时间旅行”,即基于有序事务日志的数据版本控制,允许根据需要审计、再现、甚至回滚数据。
查看英文原文:
https://databricks.com/blog/2019/08/14/productionizing-machine-learning-with-delta-lake.html
你也「在看」吗?👇