使用Dataflow进行ETL开发生命周期

2024年08月09日 由 alex 发表 80 0

简介

在本文中,我们将探讨Dataflow推荐的测试和部署方法的好处。我们将展示数据工程师团队如何同时在相同的ETL代码集上工作,并能够保持他们的代码独立、易于测试,并能够毫不费力地将其更改与生产环境合并。


"Git解决方案"

我们中的一些人可能还记得以前将ETL代码推送(或保存)到公司中存在的任何版本控制或网络驱动系统中的时代。它可能只是将某种SQL保存到编辑器窗口中,并将其命名为像bob_etl_v1.sql这样的内容,以确保保留所有先前的版本并确保看到前缀的每个人都不会覆盖你的代码。那些日子幸运地已经过去了,但是适当的版本控制和其他软件工程实践只有在近年来才进入数据工程领域。


其中有一个工具在这方面迄今为止被最广泛使用:git。有了git的一点帮助,任何人都可以自由地在本地分支上开发代码,然后在确保代码没有语法、逻辑和性能问题后,安全地将其与生产代码合并...或者我们希望它那么简单。


事实是,将你正在处理的ETL代码安全地测试在一个独立的环境中并不总是简单的。或者将你的转换逻辑(特别是写在SQL中的逻辑)作为如果是在生产模式下运行。这里的常见问题是SQL逻辑经常是参数化的,有时参数是动态生成的,并且输出必须与当前生产环境隔离开来。准备一个设计良好的测试流水线及其环境通常会非常麻烦,以至于有时候数据工程师只能摇摇欲坠并在生产环境中进行测试。


显然,版本控制工具或SQL模板化并没有什么革命性,但让我们来看看与稳定的CI/CD框架相结合时它们可以产生的效果。以下是Dataflow提供的相关功能列表,可直接使用,我们将在本文中讨论:

  • 分支驱动的配置
  • 单元测试框架(带有数据集模拟)
  • 集成测试框架
  • ci/cd定制钩子


分支驱动的配置

这是使你的Dataflow项目易于开发和测试的核心元素。让我们来看看某个随机存储库中的示例Dataflow配置。我们想要向你展示的文件名为dataflow.yaml,它通常位于存储库的根目录中:


~/random-dataflow-repo$ cat dataflow.yaml
[...]
defaults:
  workflows:
    scheduler: sandbox
    variables:
      TARGET_DB: ${dataflow.username}
branches:
  staging:
    workflows:
      scheduler: production
      variables:
        TARGET_DB: staging
  main:
    workflows:
      scheduler: production
      variables: 
        TARGET_DB: prod


现在让我们回顾一下上面的配置文件。其内容定义了以下模式:

  • 如果你正在一个名为staging的git分支上操作,则: → Dataflow将确保将你的工作流部署到计划程序¹生产群集上, → 并将TARGET_DB工作流变量设置为staging,因此所有输出表都在该数据库中创建(和访问)。
  • 另一方面,如果你正在名为main的git分支上运行,则: → Dataflow将你的工作流部署到计划程序生产群集上, → 并将TARGET_DB工作流变量设置为prod。
  • 最后...如果你位于一个未在配置中识别或注册的git分支上,Dataflow将使用默认设置,这意味着: → 它将你的工作流部署到调度程序沙盒群集上, → 并将TARGET_DB工作流变量设置为你自己的用户名的值。


 当然,上述设置并不完全适用于每个人,甚至对于同一用户在不同调整下运行相同的流水线也不适用,但是将该配置调整到足够满足你所需的任何类型的用例的程度并不困难。从一开始就使其过于复杂是没有意义的,特别是如果说,某个Dataflow管理的存储库中只有几个工作流,并且只由少数用户访问。


还值得注意的是,迄今为止上述“main”分支配置最常见的情况是仅在CI/CD框架内应用,而默认设置通常适用于本地环境,某人正在构建和测试其代码。


Pull-Request生命周期

Pull-Request(PR)生命周期是实现对ETL代码安全和可靠使用的关键概念。即使在使用Dataflow时,你也不必遵循它,但是当你这样做时,你的数据工程工作将变得更加愉快、可预测和可复制。


让我们来看一下这个图表:


1


几乎任何数据工程项目都可以使用上述模式安全地管理其代码和数据工件。主要问题在于,如果你的环境是一个全新的状态,并且没有类似DBT的平台可供使用,那么设置起来可能会很困难。在此时,Dataflow非常有用。让我们来看看它的功能如何与上述PR生命周期相对应。


使用Dataflow进行测试

如果你使用Dataflow设置ETL项目,你可以从所有这些测试方法中受益。但是重要的是要提到,Dataflow团队并不是从头开始创建所有这些功能,而是将现有的工具和库结合成一个一致的框架,以使数据流水线(在Netflix上)更高效、更轻松。


单元测试

过去,验证SQL语法是一个繁琐的过程。你必须将代码复制到SQL编辑器中,将所有参数替换为适当的值,并添加一个EXPLAIN [PLAN]语句。这种方法允许你检查代码的语法。但是,如果你需要进行更改,就有可能引入新的语法错误,必须重复整个过程。


动机

由于在生产环境、沙盒环境或SQL编辑器中运行整个流水线(带有嵌入的SQL)可能非常耗时,编写单元测试可以快速识别明显的错误。这种方法提高了开发过程的效率,并最大程度地减少了ETL代码中的风险。


隔离的PySpark

为了避免废话,让我们澄清一下关于单元测试的几个问题。不应该使用实际的spark会话进行单元测试,也不应该访问生产目录,因为这最终会导致在某个时刻在生产数据上出现问题。


因此,在Dataflow计划下,为了进行单元测试,我们开发了一个内部库来解决这个问题。这是一个Python模块,提供类似于unittest.TestCase库的接口,但基于与Netflix内部Spark完全相同的PySpark版本,并包含其所有UDF库。这个名为dataflow.unittest的模块与生产表隔离开来,非常适合在Spark环境中进行单元测试,而不影响任何实时数据。


以上Dataflow单元测试框架首先检查这个特殊的PySpark版本是否存在。如果找到了常规的(PySpark)版本,Dataflow会发出警告,建议不要在单元测试中使用它,以防意外访问生产数据库。


示例

让我们重新访问我们之前博客中的一个示例,其中我们每天计算出前一百部电影/电视节目。以下是一个完成此任务并将结果写入目标表的SQL脚本的优化引用:


-- Step 1: Aggregate view hours by title and country
WITH STEP_1 AS (
    SELECT
        title_id,
        country_code,
        SUM(view_hours) AS view_hours
    FROM schema.playback
    WHERE playback_date = CURRENT_DATE
    GROUP BY
        title_id,
        country_code
),
-- Step 2: Rank all titles from most watched to least in every country
STEP_2 AS (
    SELECT
        title_id,
        country_code,
        view_hours,
        RANK() OVER (
            PARTITION BY country_code 
            ORDER BY view_hours DESC
        ) AS title_rank
    FROM STEP_1
),
-- Step 3: Filter all titles to the top 100
STEP_3 AS (
    SELECT
        title_id,
        country_code,
        view_hours,
        title_rank
    FROM STEP_2
    WHERE title_rank <= 100
)
-- Write to target table
-- Insert the final results into the target table
INSERT INTO ${TARGET_DB}.dataflow_results
SELECT
    title_id,
    country_code,
    title_rank,
    view_hours,
    CURRENT_DATE AS date
FROM STEP_3;


为了确保全面的单元测试并避免任何漏洞,我们需要测试这个流水线中的SQL。此彻底的测试确保完全覆盖,并确保SQL逻辑(top100.sql)的每个部分都能正确可靠地运行。


此外,下面是文件结构的快速参考,其中包括单元测试本身和要进行测试的SQL文件:


top100
├── pipeline-definition.yaml
├── ddl
│   └── dataflow_targettable.sql
└── src
    ├── mocks
    │   ├── dataflow_top100_expected_sample.yaml
    │   ├── schema.playback.yaml
    ├── top100.sql


在示例单元测试中,我们将:

  • 准备参与我们工作流程的源表和目标表。
  • 然后,使用run_sql_from_file方法在本地Netflix专门的Spark环境中执行SQL代码进行测试。
  • 最后,将结果与模拟数据进行比较。


以下是一个完成此任务的单元测试类示例:


from dataflow.unittest import TestSparkSession
class TestSparkSQLWrite(TestSparkSession):
   def test_write(self):
       # prepare test arguments
       args = {"TARGET_DB": "foo", "TARGET_TABLE": ...}
       # prepare source table: schema.playback
       self.create_table_from_yaml("./mocks/schema.playback.yaml")
       # prepare target table
       self.create_table_from_ddl(
           "../ddl/dataflow_top100_expected_sample.sql", args
      )
       # run the job
       self.run_sql_from_file("./top100.sql", variables=args)

       # evaluate results
       self.compare_table_with_yaml(
           args.TARGET_TABLE, "./mocks/dataflow_top100_expected_sample.yaml"
       )


模拟数据集

Dataflow单元测试需要示例数据来确保转换的准确性。Netflix的数据工程师经常为他们的测试案例生成示例数据集。这些示例数据集是实际数据的一个小子集,并可以使用Dataflow的模拟功能创建和存储在一个YAML文件中。这个功能在我们之前的博客文章中已经讨论过,但以下是一个快速的命令行输出供参考:


$ dataflow mock save schema.playback
Sample data from schema.playback dataset successfully written to ./schema.playback.yaml!


在上面的示例中,从schema.playback表中生成了一些样本数据,并将其保存在一个YAML文件中。一旦准备好这些模拟数据,我们就可以在单元测试代码中使用它来构建一个模拟表。模拟数据集对于测试代码的所有输入表是必需的,但对于输出/目标表来说,你不需要这样做。当单元测试代码运行时,如果compare_table_with_yaml方法找不到要比较的表,它会为你创建一个。而当它这样做时,它也会测试失败,这样你就知道发生了什么,并有机会在合并代码之前进行审查。


总之,单元测试流水线可以确保你的SQL代码具有全面的覆盖率。虽然单元测试是良好软件开发实践的基石,但在ETL流水线中的重要性也同样重要。


集成测试

单元测试很酷,编写它们也很有趣。在开发周期中,它们可以快速提供反馈。然而,单元测试不会告诉你工作流程定义是否正确,或者你是否正确连接了工作流程步骤。这就是集成测试的用武之地。对于本文,我们将集成测试定义为在非生产环境中执行一个或多个工作流。


我们每个人可能都能理解“迷人”的ETL代码测试世界。你更改一些转换逻辑,保存,启动工作流程,并且在保存更改之前,你最好记得将代码的输出更改为临时目标,否则你将会度过一段不愉快的时光,可能需要恢复一些表。


在这种情况下,我们需要回想一下Dataflow分支驱动的配置。通过自动切换目标表的名称或不同数据库,取决于运行时环境,我们使集成测试不易出错,并且更加容易重现。


为了说明这种模式,我们将继续使用前一章中的示例流水线,这也是Dataflow提供的示例流水线。


首先让我们回顾一下在此处发挥作用的组件。假设我们有一个现有的在生产环境中运行的工作流名为dataflow_sparksql_sample.workflow。这个工作流每天运行一次,并且每天在一个名为dataflow_sparksql_sample的表中创建一个数据分区。让我们看一下部分定义:


Trigger:
  tz: US/Pacific
Variables:
  TARGET_TABLE: dataflow_top100_expected_sample
  TARGET_DB: ${DATAFLOW.TARGET_DB}
Workflow:
    id: dataflow_top100_expected_sample.workflow.${DATAFLOW.BRANCH}
    jobs:
      - job:
          id: ddl
          type: Spark
          spark:
              script: $S3{./ddl/dataflow_top100_expected_sample.sql}
              parameters:
                  TARGET_DB: ${DATAFLOW.TARGET_DB}
      - job:
          id: write
          type: Spark
          spark:
              script: $S3{./src/top100.sql}
              parameters:
                  SOURCE_DB: prod
                  TARGET_DB: ${DATAFLOW.TARGET_DB}
                  DATE: ${CURRENT_DATE}


请注意,上述工作流中列出的一些变量是由Dataflow管理的。这些变量必须在Dataflow配置文件中定义。例如,根据上述工作流定义,我们应确保定义了DATAFLOW.TARGET_DB变量。


示例dataflow.yaml配置文件可能如下所示:


defaults:
  workflows:
    variables:
      TARGET_DB: ${dataflow.username}_db
branches:
  staging:
    workflows:
      variables:
        TARGET_DB: stg
  main:
    workflows:
      variables:
        TARGET_DB: prod


现在,如果有人克隆了具有上述代码的存储库并需要修改SQL转换代码,他们只需要按照以下一般步骤进行操作:

  1. 如果必要的话,对转换代码和 DDL进行一些更改。调整单元测试并确保它们通过(参见上一章)。
  2. 使用以下命令在安全的用户命名空间中本地运行工作流dataflow project run ./dataflow_top100_sample.workflow.yaml:并且只要在自定义 git 分支中执行上述命令,输出表的目标数据库就会被替换为<username>_db。
  3. 提交你的代码,创建 PR,进行审核并将其合并到分支staging。Dataflow 管理的 CI/CD 将自动测试 PR,合并后它将在上下文中启动工作流的另一次运行。这意味着数据将根据 Dataflow 配置设置staging写入数据库中的目标表。stg


就是这样。一旦相同的pull-request进入到main分支,生产环境中的工作流程将自动更新,有很大的确定性经过多个测试阶段后,一切都可以按预期工作。请注意,为了在实际数据上测试已更新的转换,不需要进行任何调整,并且单个Dataflow命令可以根据其运行的git上下文安全地部署和执行工作流程。这使得开发周期更安全,并且易于重复执行。


但我们都知道,代码测试是一回事,而输入数据在我们认为已经考虑了所有边界情况时可能会让我们感到意外。这就是数据审核的作用所在...


数据审核

数据审核对于确保最终数据集适合使用至关重要。虽然单元测试和集成测试涵盖了技术方面,但数据审核验证数据的准确性,并与业务期望保持一致。每个业务用例都具有特定的数据要求,例如非空字段、定义值范围和预期每日行计数。数据审核验证传入数据是否满足这些条件,根据业务标准确认其一致性和可靠性。


为了实现这一点,Netflix的另一个团队开发了DataAuditor,这是一个Python库,旨在通过评估我们数据仓库中的表或列中的数据来确保数据质量。典型的审核过程包括选择数据,针对预定义的检查进行评估,并生成二进制结果(通过/失败)。DataAuditor的检查可以集成到任何工作流定义中,在数据审核失败的情况下,可以停止工作流或生成警报。由于DataAuditor作业使用表名和查询作为参数,因此可以在Dataflow分支配置范围内进行轻松调整。


DataAuditor提供了一套预定义的审核,用于常见的数据质量检查,只需要在工作流中进行最少的代码和配置。其中一些预定义的审核利用了Iceberg __partitions元数据表,实现快速执行,几乎没有开销,并且无需扫描整个数据集。以下是一些这些预定义审核的示例:

  • 列不应该有null值
  • 列在主键上应该是唯一的
  • 列的值应在范围内
  • 列的值应与正则表达式匹配


让我们深入了解如何将这些审核应用于我们的示例数据集(热门电影/电视节目),以确保数据的准确性和健壮性。


sql> SELECT * FROM foo.dataflow_sample_results
     WHERE date = 20220101 and country_code = 'US'
     ORDER BY title_rank LIMIT 5;
title_id | country_code | title_rank | view_hours | date
----------+--------------+------------+------------+----------
11111111 | US           |          1 |   123      | 20220101
44444444 | US           |          2 |   111      | 20220101
33333333 | US           |          3 |   98       | 20220101
55555555 | US           |          4 |   55       | 20220101
22222222 | US           |          5 |   11       | 20220101


在这个例子中,我们希望确保最终的数据产品满足以下要求:

  • 县代码应为非空列
  • 主键应为title_id和country_code(意味着该组合不应有重复的行)
  • country_code中的唯一值应为195(假设总共有195个国家)
  • 每个国家应有前10个,这意味着该表应有195 * 10行记录 = 1950(假设每个国家至少有10个受欢迎的项目)
  • 表的分区列应为dateint


以下是示例数据审计的代码示例:


data_auditor:
    audits:
      - function: table_should_be_unique_on_primary_key
        blocking: true
        params:
          table: ${dataflow.TARGET_DB}.dataflow_sample_results 
      - function: columns_should_not_have_nulls
        blocking: true
        params:
          table: ${dataflow.TARGET_DB}.dataflow_sample_results 
          columns: [country_code]
      - function: table_should_have_dateint_partitions
        params:
          table: ${dataflow.TARGET_DB}.dataflow_sample_results
      - function: query_should_return_country_count_within_range
        params:
          query_name: Total Country Count for Top10
          targets: ${TABLE_PATH}
          query: >
            SELECT
              COUNT(distinct country_code) AS total_country_count
            FROM ${dataflow.TARGET_DB}.dataflow_sample_results
            WHERE date = ${TODAY_DATE}
          lower_bound: 195
          upper_bound: 195
      - function: query_should_return_number_rows_within_range
        params:
          query_name: Total Row Count for Top10
          targets: ${dataflow.TARGET_DB}.dataflow_sample_results
          query: >
            SELECT
              COUNT(*) AS total_row_count
            FROM ${dataflow.TARGET_DB}.dataflow_sample_results
            WHERE date = ${TODAY_DATE}
          lower_bound: 1950
          upper_bound: 1950


DataAuditor使每个人都能将数据检查无缝地集成到工作流程中,实现对数据问题的早期检测和解决。这种主动的方法不仅增强了数据产品的质量,还增强了对从中获取的洞见的信任和信心。而DataAuditor最好的地方在于,你既可以用它来处理你拥有的数据集的生产部分,也可以用它来处理你从中读取的数据集的消费步骤,以防止输入数据的一些不希望的情况影响你的转换。


项目变量

如果上述框架在测试或部署项目时没有提供足够的灵活性,Dataflow还具有两个可调节项,使你的CI/CD工作流程完全按照你的意愿操作。这两个可调节项是项目变量和自定义钩子。


工作流程定义通常需要从环境或用户传递一些值,这并不罕见。借助项目变量,Dataflow使这成为可能。项目变量可以分为三个一般类型,在Dataflow managed工作流程中可供大家引用:

  • 系统变量
  • 工作流资产变量
  • 自定义变量


系统变量是由Dataflow开箱即用提供的,可以在工作流程定义中使用,无需进行其他设置。由于其范围有限,这些变量并不多。以下是一些示例:

  • ${dataflow.username} 返回当前的本地用户名
  • ${dataflow.branch} 返回当前的本地git分支名称
  • ${dataflow.commit_hash} 返回当前的git提交哈希


工作流资产变量使你能够引用由Dataflow构建的资产的唯一位置,无论是在当前项目还是在其他Dataflow项目中。以下是一些示例:

  • ${dataflow.jar.<namespace>} 返回某个命名空间下Dataflow项目的最新jar位置。
  • ${dataflow.egg.<namespace>} 返回某个命名空间下Dataflow项目的最新egg位置。
  • ${dataflow.jar.<namespace>.<build>} 返回某个命名空间下Dataflow项目的特定版本jar的位置。
  • ${dataflow.egg.<namespace>.<build>} 返回某个命名空间下Dataflow项目的特定版本egg的位置。


自定义变量是最有趣的类型。可以在项目范围内定义它们,然后根据执行的分支重新定义它们的值,从而在代码内部实现更深层次的CI/CD集成。假设你想定义一个变量${dataflow.my_db},然后根据执行的分支来调整它的值。以下是你可以在Dataflow配置文件中定义它的方式:


defaults:
  workflows:
    variables:
      my_db: default_db
branches:
  staging:
    workflows:
      variables:
        my_db: dev_db
  main:
    workflows:
      variables:
        my_db: prod_db


为了调试目的,当Dataflow渲染工作流程时,无论是为了测试还是部署,它都可以显示这些变量被替换为什么值。


自定义钩子

如果Dataflow项目变量不足以使你的存储库的CI/CD完全按照你的需求进行操作,则有一个额外的调节项可以实现这一点。它称为“自定义钩子”,它允许你无缝地插入脚本,无论是在任何标准Dataflow命令执行之前还是之后。让我们看一个示例。


假设你想向标准的dataflow project test命令添加一些逻辑。当然,如果这是一个普遍需要的逻辑,我们鼓励你向Dataflow仓库提交贡献的pull-request。但如果这只是你自己项目中需要的内容,那么你可以简单地将以下脚本添加到你的存储库中:

  • PROJECT_ROOT/dataflow-hooks/project/before-test.sh
  • PROJECT_ROOT/dataflow-hooks/project/after-test.sh


目录和文件命名约定使得Dataflow可以找到并运行这些文件,无需进行任何配置。请注意上面粗体显示的路径元素。这些脚本所调用的Dataflow命令被嵌入在路径中。对于此示例,让我们假设这些脚本只是分别输出“Hello world!”和“Bye-bye!”。


有了上述脚本,这就是你的dataflow project test命令的样子:


$ dataflow project testtest
Dataflow (<version>)
Running custom script <PROJECT_DIR>/dataflow-hooks/project/before-test.sh ...
Hello world!
# output of the actual "dataflow project test" command
Running custom script <PROJECT_DIR>/dataflow-hooks/project/after-test.sh ...
Bye-bye!


有人可能会说这些自定义钩子是不必要的。毕竟,你可能已经可以调整你的CI/CD作业,在Dataflow命令之前或之后执行任何自定义脚本。这是正确的...但请记住,Dataflow的主要目标之一是标准化,如果有人对你的项目不熟悉,尝试运行一个Dataflow命令,如果将其作为Dataflow钩子插入,它们将自动执行你的自定义逻辑。


结论

Dataflow在Netflix数据流水线生态系统中提供了一个强大的测试框架。对于以前不易进行单元测试的Spark SQL代码,这尤其有价值。所有这些测试功能,无论是用于单元测试、集成测试还是数据审计,都以Dataflow命令或Python库的形式提供,使其易于设置、易于运行,并且无法找到任何理由不使用强大的测试工具来监控你的所有ETL工作流程。而且最棒的部分是,一旦创建了所有这些测试,它们将在标准的Dataflow命令调用或CI/CD工作流程中自动运行,允许自动化检查可能对整个设置不熟悉的人员所做的代码更改。


文章来源:https://medium.com/@netflixtechblog/etl-development-life-cycle-with-dataflow-9c70c64aba7b
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消