使用Apache Spark构建机器学习管道

2025年06月03日 由 佚名 发表 231 0


Machine Learning Pipelines with Apache Spark


 Apache Spark是一个用于处理大数据的工具。它免费且速度极快,能够处理不适合计算机内存的大量数据。机器学习管道是一系列用于准备数据和训练模型的步骤,包括数据收集、数据清理、特征选择、模型训练以及模型效果评估。

Spark使得构建这些管道变得简单。通过使用Spark,公司可以快速分析大量数据并创建机器学习模型,从而帮助他们基于现有信息做出更明智的决策。在本文中,我们将介绍如何在Spark中设置和使用机器学习管道。


Spark中机器学习管道的组成部分

 
Spark的MLlib库提供了许多内置工具,这些工具可以组合在一起构建完整的机器学习流程。


转换器

转换器用于以某种方式改变数据。它们接收一个DataFrame并返回其修改后的版本,常用于编码分类数据或缩放数值特征。示例包括用于编码的StringIndexer和用于缩放的StandardScaler。转换器是可重用的,并且不会永久更改原始数据。


估计器

估计器通过从数据中学习来创建模型,包括像LogisticRegression和RandomForestClassifier这样的算法。估计器使用fit方法在数据上进行训练,并输出一个可以进行预测的Model对象。


管道

管道是一个将转换器和估计器连接成单一工作流程的工具。通过按顺序组织它们,数据可以顺畅地从一个步骤流向下一个步骤。管道使得重新训练模型、重复过程和调整参数变得更加容易。


让我们通过一个基本示例来构建一个用于预测客户流失的分类管道。在这个管道中,我们将:


  1. 加载数据:将数据集导入Spark进行处理。
  2. 预处理数据:清理和准备数据以进行建模。
  3. 设置模型:准备逻辑回归模型。
  4. 训练模型:将机器学习模型拟合到数据上。
  5. 评估模型:检查模型的表现如何。

初始化Spark会话并加载数据集


首先,我们使用SparkSession.builder来设置会话。然后,我们加载客户流失数据集,这些数据涉及关闭账户的银行客户。


from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MLPipeline").getOrCreate()

# Load dataset
data = spark.read.csv("/content/Customer Churn.csv", header=True, inferSchema=True)

# Show the first few rows of the dataset
data.show(5)

 
dataset


数据预处理

 
首先,我们检查数据中是否有缺失值。如果有缺失值,我们会删除这些行以确保数据完整。接下来,我们将分类数据转换为数值格式,以便计算机可以理解。我们使用StringIndexer和OneHotEncoder等方法来实现这一点。最后,我们将所有特征组合成一个向量并对数据进行缩放。


from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# Check for missing values
missing_values = data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in data.columns])

# Drop rows with any missing values
data = data.na.drop()

# Identify categorical columns
categorical_columns = ['country', 'gender', 'credit_card', 'active_member']

# Create a list to hold the stages of the pipeline
stages = []

# Apply StringIndexer to convert categorical columns to numerical indices
for column in categorical_columns:
indexer = StringIndexer(inputCol=column, outputCol=column + "_index")
stages.append(indexer)

# Apply OneHotEncoder for categorical features
encoder = OneHotEncoder(inputCols=[column + "_index"], outputCols=[column + "_ohe"])
stages.append(encoder)

label_column = 'churn' # The label column
feature_columns = [column + "_ohe" for column in categorical_columns]

# Add numerical columns to the features list
numerical_columns = ['credit_score', 'age', 'tenure', 'balance', 'products_number', 'estimated_salary']
feature_columns += numerical_columns

# Create VectorAssembler to combine all feature columns
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
stages.append(vector_assembler)

# Scale the features using StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
stages.append(scaler)

 

逻辑回归模型设置


我们导入LogisticRegressionpyspark.ml.classification。接下来,我们通过使用LogisticRegression()创建一个逻辑回归模型。


from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Logistic Regression Model
lr = LogisticRegression(featuresCol='scaled_features', labelCol=label_column)
stages.append(lr)

# Create and Run the Pipeline
pipeline = Pipeline(stages=stages)

 

模型训练和预测

 
我们将数据集分为训练集和测试集。然后,我们将管道模型拟合到训练数据上,并对测试数据进行预测。


# Split data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Fit the model
pipeline_model = pipeline.fit(train_data)

# Make Predictions
predictions = pipeline_model.transform(test_data)

# Show the predictions
predictions.select("prediction", label_column, "scaled_features").show(10)

 
predictions
 

模型评估


我们导入MulticlassClassificationEvaluatorpyspark.ml.evaluation来评估我们模型的性能。我们使用模型的预测结果计算准确率、精确率、召回率和F1分数。最后,我们停止Spark会话以释放资源。


from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Precision
evaluator_precision = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print(f"Precision: {precision}")

# Recall
evaluator_recall = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print(f"Recall: {recall}")

# F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)
print(f"F1 Score: {f1_score}")

# Stop Spark session
spark.stop()

 
evaluation


结论


在本文中,我们了解了如何在Apache Spark中构建机器学习管道。管道帮助组织机器学习过程的每一步。我们从加载和清理客户流失数据集开始,然后转换数据并创建了一个逻辑回归模型。在训练模型后,我们对新数据进行了预测。最后,我们使用准确率、精确率、召回率和F1分数评估了模型的性能。
 

    文章来源:https://www.kdnuggets.com/implementing-machine-learning-pipelines-with-apache-spark
    欢迎关注ATYUN官方公众号
    商务合作及内容投稿请联系邮箱:bd@atyun.com
    评论 登录
    热门职位
    Maluuba
    20000~40000/月
    Cisco
    25000~30000/月 深圳市
    PilotAILabs
    30000~60000/年 深圳市
    写评论取消
    回复取消