对于生产级应用而言,数据验证是至关重要的一步。你需要确保所摄入的数据与你的处理流程相兼容,并且不存在异常值。此外,数据验证还是一种安全措施,能够防止任何损坏或不准确的信息被进一步处理,从而在最初步骤就发现问题。
Python已经有一个很棒的开源项目用于这项任务,那就是Pydantic。然而,在处理像机器学习中的大型类数据帧对象时,Pandera是一种更快且更具可扩展性的数据验证方式。
此外,Pandera支持多种数据帧库,如pandas、polars、dask、modin和pyspark.pandas。
使用Pandera验证数据
Pandera有两种定义验证器的方式:模式(Schemas)和模型(Models)。由于模型与Pydantic模型相似且代码简洁,我将重点介绍模型。
要定义一个Pandera模型,请创建一个继承自DataframeModel的子类,并开始声明数据帧必须具有的列和数据类型(dtypes):
import pandera as pa
class UserModel(pa.DataFrameModel):
id: int
username: str
email: str
is_active: bool
membership: str
creation_date: pd.DatetimeTZDtype
# Use
df = pd.DataFrame(...)
UserModel.validate(df) # <- If invalidad raises SchemaError
请注意,为了定义用户的创建时间戳,我使用了Pandas的原生日期类型,而不是其他如datetime.datetime的类型。Pandera仅支持内置的Python、NumPy和Pandas数据类型。你也可以创建自定义数据类型,但这是一个高级主题,在大多数情况下很少需要。
验证列属性
使用Pandera,除了数据类型之外,你还可以验证列的其他属性:
class UserModel(pa.DataFrameModel):
id: int = pa.Field(unique=True, ge=0)
username: str = pa.Field(str_matches=r"^[a-zA-Z0-9_]+$")
email: str = pa.Field(str_matches=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
is_active: bool
membership: str = pa.Field(isin=["premium", "free"])
creation_date: pd.DatetimeTZDtype = pa.Field(dtype_kwargs={"unit": "ns", "tz": "UTC"})
在这里,我使用Pandera的Field就像使用Pydantic的一样。
自定义验证
有时需要添加你自己的自定义验证。Pandera允许你注入列/索引检查(单列的自定义检查)和数据帧检查(多个列之间的检查)。
import pandera as pa
from pandera.typing import Series
class UserModel(pa.DataFrameModel):
id: int = pa.Field(unique=True, ge=0)
username: str = pa.Field(str_matches=r"^[a-zA-Z0-9_]+$")
email: str = pa.Field(
str_matches=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
is_active: bool
membership: str = pa.Field(isin=["premium", "free"])
creation_date: Annotated[pd.DatetimeTZDtype, "ns", "UTC"]
# column/index checks
@pa.check("username", name="username_length")
def username_length(cls, x: Series[str]) -> Series[bool]:
"""
Check username length is between 1 and 20 characters
"""
return x.str.len().between(1, 20)
@pa.check("creation_date", name="min_creation_date")
def min_creation_date(cls, x: Series[pd.DatetimeTZDtype]) -> Series[bool]:
"""
Check creation date is after 2000-01-01
"""
return x >= dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc)
# dataframe check
@pa.dataframe_check
def membership_is_valid(
cls, df: pd.DataFrame, name="membership_is_valid"
) -> Series[bool]:
"""
Check account age for free memebers is <= 30 days
"""
current_time = dt.datetime.now(dt.timezone.utc)
thirty_days = dt.timedelta(days=30)
return (df["membership"] == "premium") | (
(df["membership"] == "free")
& ((current_time - df["creation_date"]) <= thirty_days)
)
请记住,你正在处理整个列对象(Series),因此为了获得更好的性能,检查中的操作应该是向量化的。
其他配置
别名
当由于语言语法的原因,列名无法被声明为Python变量时,Pandera允许为列验证器设置一个别名来匹配数据帧中的列名。
class MyModel(pa.DataFrameModel):
alias_column: int = pa.Field(..., alias="Alias Column")
...
严格模式和强制转换
当strict选项被设置为true时,它会强制要求被验证的数据帧仅包含Pandera DataFrameModel中定义的列。另一方面,当激活coerce选项时,Pandera会尝试将列数据转换为与模型中的dtype相匹配的类型。
class MyModel(pa.DataFrameModel):
...
class Config:
strict = True # defaul: False
coerce = True # default: False
coerce选项也可以在Field级别通过设置pa.Field(..., coerce=True)来启用。
延迟验证
默认情况下,每当验证检查未通过时,pandera都会引发错误。这可能会很恼人,因为它只显示遇到的第一个验证错误,并阻止检查其余数据。
在某些情况下,最好让整个数据帧一次性完成验证,并收集所有错误,而不是逐个修复它们并等待验证再次运行。延迟验证就是这样做的。
df = pd.DataFrame(...)
Mymodel.validate(df, lazy_validation=True)
包含数据验证的机器学习生产流水线
由于大多数机器学习流水线都是在Python中使用表格数据(编码为数据帧结构)进行训练的,因此Pandera是一个强大且出色的工具,用于验证这些流水线的输入和输出。
# pipeline.py
class MLPipeline:
"""General ML Pipeline"""
def __init__(self, model_id: str):
self.model_id = model_id
def load_model(self) -> None:
...
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
... # <- Potential invalid data error
return df_transform
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
self.load_model()
df_transform = self.transform(df)
df['score'] = self.model.predict(df_transform) # <- Potential invalid data error
return df
我们希望避免模型因无效数据而引发错误。那将意味着我们为加载模型到内存和处理原始数据所做的所有工作都徒劳无功,既浪费了资源,又阻止了其余数据点的评估。
同样,如果模型的输出结构不正确,我们的后处理流水线(如将结果上传到数据库、通过RESTful API返回结果等)也将失败。
在使用Pandera定义了验证模型之后,我们可以利用其装饰器进行流水线集成,以执行输入/输出验证。
# models.py
import pandera as pa
class InputModel(pa.DataFrameModel):
...
class PredictorModel(pa.DataFrameModel):
...
# OutputModel inherits all InputModel validation fields
# and also includes the score
class OutputModel(InputModel):
score: float = pa.Field(ge=0, le=1) # assuming model returns probab.
# pipeline.py
import pandera as pa
from .models import InputModel, PredictorModel, OutputModel
class MLPipeline:
"""General ML Pipeline"""
def __init__(self, model_id: str):
self.model_id = model_id
def load_model(self) -> None:
...
@pa.check_io(df=InputModel.to_schema(), out=PredictorModel.to_schema(), lazy=True)
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
...
return df_transform
@pa.check_output(OutputModel.to_schema(), lazy=True)
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
self.load_model()
df_transform = self.transform(df)
df['score'] = self.model.predict(df_transform)
return df
因为我们在机器学习流水线中生成了一个中间数据帧对象df_transform,所以验证它也是一个好习惯,以防止出现错误。predict方法的输入没有进行验证,因为transform_data已经完成了这项工作。
处理无效行
我们不希望流水线仅仅因为一些数据点包含错误数据而中断。在出现验证错误的情况下,策略应该是将有问题的数据点搁置一旁,并继续用其余数据运行流水线。流水线不能停下来!
Pandera模型提供了自动删除所有无效行的选项:
class MyModel(pa.DataFrameModel):
...
class Config:
drop_invalid_rows = True
然而,不记录就删除所有无效行可能是危险的。你需要知道那些数据点为什么无效,以便之后能够向客户或数据工程师说明错误的原因。
这就是为什么我宁愿创建自己的验证辅助函数,而不是使用pandera的装饰器。
from typing import Tuple
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_pandera_errors(exc: pa.errors.SchemaErrors) -> None:
"""
Logs all errors from a SchemaErrors exception.
"""
for err_type, categories in exc.message.items():
for _, errors in categories.items():
for err in errors:
logger.error(f"{err_type} ERROR: {err['column']}. {err['error']}")
def handle_invalid(
df: pd.DataFrame, exc: pa.errors.SchemaErrors
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Handles invalid data in a DataFrame based on a SchemaErrors exception.
"""
log_pandera_errors(exc)
df_failure = exc.failure_cases
# Check for errors that cannot be resolved
# i.e. they aren't associated with a specific row index
nan_indices = df_failure["index"].isna()
if nan_indices.any():
error_msg = "\n".join(
f" - Column: {row['column']}, check: {row['check']}, "
f"failure_case: {row['failure_case']}"
for row in df_failure[nan_indices].to_dict("records")
)
raise ValueError(
f"Schema validation failed with no possibility of continuing:\n{error_msg}\n"
"The pipeline cannot continue ?. Resolve before rerunning"
)
invalid_idcs = df.index.isin(df_failure["index"].unique())
df_invalid = format_invalid_df(df.loc[invalid_idcs, :], exc)
df_valid = df.iloc[~invalid_idcs]
return df_valid, df_invalid
def validate(
df: pd.DataFrame, model: pa.DataFrameModel
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Validates a DataFrame against a DataFrameModel and handles errors.
"""
try:
return model.validate(df, lazy=True), pd.DataFrame()
except pa.errors.SchemaErrors as ex:
return handle_invalid(df, ex)
输出强制产生一些错误并移除列id
# Error output
ERROR:__main__:SCHEMA ERROR: UserModel. column 'id' not in dataframe. Columns in dataframe: ['username', 'email', 'membership', 'is_active', 'creation_date']
ERROR:__main__:DATA ERROR: username. Column 'username' failed element-wise validator number 0: str_matches('^[a-zA-Z0-9_]+$') failure cases: b%09
ERROR:__main__:DATA ERROR: email. Column 'email' failed element-wise validator number 0: str_matches('^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$') failure cases: ef.com
ERROR:__main__:DATA ERROR: UserModel. DataFrameSchema 'UserModel' failed element-wise validator number 0: <Check membership_is_valid> failure cases: c, ef.com, free, True, 2000-12-31 00:00:00+00:00
ValueError: Schema validation failed with no possibility of continuing:
- Column: UserModel, check: column_in_dataframe, failure_case: id
The pipeline cannot continue ?. Resolve before rerunning
如果遇到涉及整列的无法解决的错误,流水线将无法继续。
测试
最后但同样重要的是,Pandera模型和模式还包含了一种根据其定义生成样本数据的方法。要使用它,你需要安装hypothesis库。
然而,在通过一些示例进行测试后,我并不推荐使用它。一旦你开始添加一些约束,生成合成数据所需的时间就会变得过长,而且大多数时候数据并不多样(生成的数据没有覆盖整个限制空间,而是重复出现)。我发现的最佳替代方案是为每个你想要测试的模型添加数据生成器——毕竟,在流水线中也没有那么多数据帧需要验证。
class UserModel(pa.DataFrameModel):
...
def sample(size: int = 10) -> pd.DataFrame:
"""Added method to generate valid test data manually"""
current_time = dt.datetime.now(dt.timezone.utc)
return pd.DataFrame(
{
"id": range(size),
"username": [f"user_{i}" for i in range(size)],
"email": [f"user_{i}@example.com" for i in range(size)],
"is_active": [True] * size,
"membership": ["premium"] * size, # All premium to pass checks
"creation_date": [current_time] * size,
}
)
结论
数据验证对于每个数据处理流水线都至关重要,尤其是在机器学习中。Pandera通过提供一种灵活且高效的基于模型的方法来验证数据帧中的数据,大大简化了这项工作。
使用Pandera,你可以定义模型类来强制列类型、范围和甚至复杂的条件约束。这使得在流水线的早期阶段就容易捕获数据质量问题,确保数据在到达下一步之前符合预期标准。
通过将Pandera集成到机器学习流水线中,你可以创建健壮的数据检查,从而有助于防止错误并提高模型输出的可靠性。
测试中使用的最终Pandera DataFrameModel为:
import pandas as pd
import pandera as pa
from pandera.typing import Series
from typing import Annotated
import datetime as dt
class UserModel(pa.DataFrameModel):
id: int = pa.Field(unique=True, ge=0, coerce=False)
username: str = pa.Field(str_matches=r"^[a-zA-Z0-9_]+$")
email: str = pa.Field(
str_matches=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
is_active: bool
membership: str = pa.Field(isin=["premium", "free"])
creation_date: Annotated[pd.DatetimeTZDtype, "ns", "UTC"]
@pa.check("username", name="username_length")
def username_length(cls, x: Series[str]) -> Series[bool]:
"""
Check username length is between 1 and 20 characters
"""
return x.str.len().between(1, 20)
@pa.check("creation_date", name="min_creation_date")
def min_creation_date(cls, x: Series[pd.DatetimeTZDtype]) -> Series[bool]:
"""
Check creation date is after 2000-01-01
"""
return x >= dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc)
@pa.dataframe_check
def membership_is_valid(
cls, df: pd.DataFrame, name="membership_is_valid"
) -> Series[bool]:
"""
Check account age for free memebers is <= 30 days
"""
current_time = dt.datetime.now(dt.timezone.utc)
thirty_days = dt.timedelta(days=30)
return (df["membership"] == "premium") | (
(df["membership"] == "free")
& ((current_time - df["creation_date"]) <= thirty_days)
)
class Config:
strict = True
coerce = True
def sample(size: int = 10) -> pd.DataFrame:
"""Added method to generate valid test data manually"""
current_time = dt.datetime.now(dt.timezone.utc)
return pd.DataFrame(
{
"id": range(size),
"username": [f"user_{i}" for i in range(size)],
"email": [f"user_{i}@example.com" for i in range(size)],
"is_active": [True] * size,
"membership": ["premium"]
* size, # All premium to avoid date restrictions
"creation_date": [current_time] * size,
}
)