【指南】了解Airbnb的ML功能平台Chronon

2024年04月10日 由 alex 发表 89 0

5


公司背景

我们创建 Chronon 的目的是为了缓解 ML 从业人员的一个共同痛点:他们将大部分时间都花在了管理为模型提供支持的数据上,而不是建模本身。


在 Chronon 诞生之前,从业人员会使用以下两种方法之一:


  1. 离线-在线复制: 人工智能从业人员使用数据仓库中的数据训练模型,然后想办法在在线环境中复制这些功能。这种方法的好处是,它允许从业人员利用完整的数据仓库,既有数据源,又有用于大规模数据转换的强大工具。缺点是没有明确的方法为在线推理提供模型特征,导致不一致和标签泄漏,严重影响模型性能。
  2. 记录并等待:ML 实践者从在线服务环境中可用的数据开始,模型推理将从在线服务环境中运行。他们将相关特征记录到数据仓库中。一旦积累了足够多的数据,他们就会在日志上训练模型,并使用相同的数据提供服务。这种方法的优点是保证了一致性,不太可能出现泄漏。不过,这种方法的主要缺点是等待时间长,无法快速响应不断变化的用户行为。


Chronon 方法可实现两全其美。Chronon 只需要人工智能从业人员定义一次特征,就能同时支持用于模型训练的离线流程和用于模型推理的在线流程。此外,Chronon 还为特征链、可观测性和数据质量以及特征共享和管理提供了强大的工具。


工作原理

下面,我们将使用快速入门指南中的一个简单示例,探讨支持 Chronon 大部分功能的主要组件。


假设我们是一家大型在线零售商,我们根据用户购物后退货的情况检测到了欺诈向量。我们想训练一个模型来预测给定交易是否可能导致欺诈性退货。每次用户开始结账流程时,我们都将调用该模型。


定义特征

购买数据: 我们可以将购买日志数据汇总到用户级别,以便了解该用户之前在我们平台上的活动。具体来说,我们可以计算用户在不同时间窗口内之前购买金额的总和、数量和平均值。


source = Source(
    events=EventSource(
        table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily"data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
        topic="events/purchases", # The streaming source topic
        query=Query(
            selects=select("user_id","purchase_price"), # Select the fields we care about
            time_column="ts") # The event time
    ))
window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below
v1 = GroupBy(
    sources=[source],
    keys=["user_id"], # We are aggregating by user
    online=True,
    aggregations=[Aggregation(
            input_column="purchase_price",
            operation=Operation.SUM,
            windows=window_sizes
        ), # The sum of purchases prices in various windows
        Aggregation(
            input_column="purchase_price",
            operation=Operation.COUNT,
            windows=window_sizes
        ), # The count of purchases in various windows
        Aggregation(
            input_column="purchase_price",
            operation=Operation.AVERAGE,
            windows=window_sizes
        ), # The average purchases by user in various windows
        Aggregation(
            input_column="purchase_price",
            operation=Operation.LAST_K(10),
        ), # The last 10 purchase prices aggregated as a list
    ],
)


这将创建一个 `GroupBy`,通过聚合不同时间窗口的不同字段,以 `user_id` 作为主键,将 `purchases` 事件数据转换为有用的特征。


这将原始购买日志数据转换为用户级别的有用特征。


用户数据: 将用户数据转化为特征数据要简单一些,主要是因为我们不必担心执行聚合。在这种情况下,源数据的主键与特征的主键相同,因此我们只需提取列值,而无需对行执行聚合:


source = Source(
    entities=EntitySource(
        snapshotTable="data.users", # This points to a table that contains daily snapshots of all users"data.users", # This points to a table that contains daily snapshots of all users
        query=Query(
            selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about
        )
    ))
v1 = GroupBy(
    sources=[source],
    keys=["user_id"], # Primary key is the same as the primary key for the source table
    aggregations=None, # In this case, there are no aggregations or windows to define
    online=True,


这将创建一个 `GroupBy`,从 `data.users` 表中提取维度作为特征,并将 `user_id` 作为主键。


将这些特征连接在一起: 接下来,我们需要将之前定义的特征组合成一个视图,该视图既可以回填用于模型训练,也可以作为完整的向量在线用于模型推理。我们可以使用 "连接 "应用程序接口(Join API)来实现这一目标。


对于我们的用例来说,以正确的时间戳计算特征是非常重要的。因为我们的模型是在结账流开始时运行的,所以我们希望在回填中使用相应的时间戳,这样模型训练的特征值在逻辑上就与模型在在线推理中看到的一致。


定义如下所示。请注意,它结合了我们之前在 API 的 right_parts 部分中定义的特征(以及另一个名为 returns 的特征集)。


source = Source(
    events=EventSource(
        table="data.checkouts", "data.checkouts", 
        query=Query(
            selects=select("user_id"), # The primary key used to join various GroupBys together
            time_column="ts",
            ) # The event time used to compute feature values as-of
    ))
v1 = Join(  
    left=source,
    right_parts=[JoinPart(group_by=group_by) for group_by in [purchases_v1, returns_v1, users]] # Include the three GroupBys
)


回填/离线计算

用户在使用上述 Join 定义时可能做的第一件事就是对其进行回填,以生成用于模型训练的历史特征值。Chronon 执行这种回填有几个主要优点:


  1. 时间点准确性: 请注意用作上述连接 "左 "侧的源。它建立在 "data.checkouts "源之上,该源包括每一行上的 "ts "时间戳,与特定结账的逻辑时间相对应。每个特征计算都保证了该时间戳的窗口准确性。因此,对于用户一个月前的购买总和,每一行都将根据左侧数据源提供的时间戳来计算。
  2. 偏差处理: Chronon 的回填算法经过优化,可处理高度倾斜的数据集,避免令人沮丧的 OOM 和挂起作业。
  3. 计算效率优化: Chronon 能够直接在后端嵌入大量优化功能,从而减少计算时间和成本。


在线计算

Chronon 为在线功能计算抽象了大量复杂性。在上述示例中,它会根据特征是批量特征还是流特征来计算特征。


批处理特征(例如上述用户特征)

由于用户功能建立在批处理表之上,Chronon 只需每天运行一次批处理作业,在批处理数据存储区出现新数据时计算新功能值,并将其上传到在线 KV 存储区以供使用。


流功能(例如上述购买功能)

购买 "功能建立在包含流组件的源上,如源中包含的 "主题 "所示。在这种情况下,除了实时更新的流作业外,Chronon 还将运行批量上传。批处理作业负责:


  1. 为值播种: 对于较长的窗口,倒带流并回放所有原始事件是不现实的。
  2. 压缩 "窗口中间 "并提供尾部精度: 为了获得精确的窗口精度,我们需要窗口头部和尾部的原始事件。


然后,流作业会向 KV 存储写入更新,以便在获取时保持特征值的最新状态。


在线服务/获取 API

Chronon 提供了一个应用程序接口(API),可在低延迟的情况下获取特征值。我们既可以获取单个 GroupBys(即上文定义的用户或购买功能)的值,也可以获取 Join 的值。下面是一个例子,说明了一个 Join 的请求和响应:


// Fetching all features for user=123
Map<String, String> keyMap = new HashMap<>();
keyMap.put("user", "123")
Fetcher.fetch_join(new Request("quickstart_training_set_v1", keyMap));
// Sample response (map of feature name to value)
'{"purchase_price_avg_3d":14.2341, "purchase_price_avg_14d":11.89352, ...}'


获取用户 123 所有特征的 Java 代码。返回类型是特征名称到特征值的映射。


上述示例使用的是 Java 客户端。此外还有 Scala 客户端和 Python CLI 工具,便于测试和调试:


run.py --mode=fetch -k '{"user_id":123}' -n quickstart/training_set -t join'{"user_id":123}' -n quickstart/training_set -t join
> {"purchase_price_avg_3d":14.2341, "purchase_price_avg_14d":11.89352, ...}


run.py 是快速测试 Chronon 工作流(如获取)的便捷方法。


另一种方法是将这些 API 包装成服务,通过 REST 端点发出请求。Airbnb 采用这种方法在 Ruby 等非 Java 环境中获取功能。


线上线下一致性

Chronon 不仅有助于提高在线-离线准确性,还提供了一种测量方法。测量管道从在线获取请求的日志开始。这些日志包括请求的主键和时间戳,以及获取的特征值。然后,Chronon 将主键和时间戳作为左侧传递给 Join backfill,要求计算引擎回填特征值。然后,它将回填值与实际获取值进行比较,以衡量一致性。


结论

开源只是第一步,我们的愿景是创建一个平台,让人工智能从业者能够就如何利用数据做出最佳决策,并尽可能轻松地实施这些决策。


文章来源:https://medium.com/airbnb-engineering/chronon-airbnbs-ml-feature-platform-is-now-open-source-d9c4dba859e8
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消