Amazon SageMaker管道 允许数据科学家和机器学习 (ML) 工程师自动化训练工作流程,这有助于您创建可重复的流程来协调模型开发步骤,以实现快速实验和模型再训练。 您可以自动化整个模型构建工作流程,包括数据准备、特征工程、模型训练、模型调整和模型验证,并将其编入模型注册表。 您可以将管道配置为定期自动运行或在触发某些事件时自动运行,也可以根据需要手动运行它们。
在这篇文章中,我们重点介绍了对 亚马逊SageMaker SDK 并引入 Amazon SageMaker Pipelines 的新功能,让 ML 从业者更轻松地构建和训练 ML 模型。
Pipelines 不断创新其开发人员体验,通过这些最新版本,您现在可以以更加自定义的方式使用该服务:
在这篇文章中,我们将使用示例数据集引导您完成工作流程,重点是模型构建和部署,以演示如何实现 Pipelines 的新功能。 最后,您应该有足够的信息来成功使用这些新功能并简化您的 ML 工作负载。
功能概述
Pipelines 提供以下新功能:
- 管道变量注释 – 某些方法参数接受多种输入类型,包括
PipelineVariables
,并添加了其他文档以阐明在哪里 PipelineVariables
SageMaker SDK 文档的最新稳定版本和函数的初始化签名都支持。 例如,在以下 TensorFlow 估计器中,init 签名现在表明 model_dir
和 image_uri
SUPPORT PipelineVariables
,而其他参数没有。 有关详细信息,请参阅 TensorFlow 估计器.
- 之前:
TensorFlow(
py_version=None,
framework_version=None,
model_dir=None,
image_uri=None,
distribution=None,
**kwargs,
)
- 后:
TensorFlow(
py_version: Union[str, NoneType] = None,
framework_version: Union[str, NoneType] = None,
model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
distribution: Union[Dict[str, str], NoneType] = None,
compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
**kwargs,
)
- 管道会话 – 管道会话 是引入的一个新概念,用于在 SageMaker SDK 中实现统一,并引入了管道资源的延迟初始化(运行调用被捕获,但在管道创建和运行之前不会运行)。 这
PipelineSession
上下文继承 SageMakerSession
并为您实现与其他 SageMaker 实体和资源交互的便捷方法,例如训练作业、端点和存储在 亚马逊简单存储服务 (亚马逊S3)。
- 与工作流管道作业步骤的子类兼容性 – 您现在可以像没有管道一样构建作业抽象,并配置和运行处理、训练、转换和调整作业。
- 例如,创建一个处理步骤
SKLearnProcessor
以前需要以下内容:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
role=role,
)
step_process = ProcessingStep(
name="{pipeline-name}-process",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
- 正如我们在前面的代码中看到的,
ProcessingStep
需要做基本相同的预处理逻辑 .run
, 只是不启动 API 调用来启动作业。 但是现在通过工作流管道作业步骤启用子类兼容性,我们声明 step_args
使用 .run 进行预处理逻辑的参数,因此您可以构建作业抽象并对其进行配置,就像在没有管道的情况下使用它一样。 我们还通过 pipeline_session
,这是一个 PipelineSession
对象,而不是 sagemaker_session
确保在创建和运行管道之前捕获运行调用但不调用。 请参阅以下代码:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
role=role,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)
- 模型步骤(具有模型创建和注册步骤的简化方法) –Pipelines 提供了两种与 SageMaker 模型集成的步骤类型:
CreateModelStep
和 RegisterModel
. 您现在只需使用 ModelStep
类型。 请注意,一个 PipelineSession
需要实现这一点。 这带来了流水线步骤和 SDK 之间的相似性。
- 之前:
step_register = RegisterModel(
name="ChurnRegisterModel",
estimator=xgb_custom_estimator,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.large"],
transform_instances=["ml.m5.large"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
-
- 后:
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
- 失败步骤(管道运行的有条件停止) –
FailStep
如果满足条件(例如模型分数低于某个阈值),则允许管道以失败状态停止。
解决方案概述
在此解决方案中,您的入口点是 亚马逊SageMaker Studio 用于快速实验的集成开发环境 (IDE)。 Studio 提供了一个管理端到端管道体验的环境。 使用 Studio,您可以绕过 AWS管理控制台 为您的整个工作流程管理。 有关在 Studio 中管理流水线的更多信息,请参阅 在 SageMaker Studio 中查看、跟踪和执行 SageMaker 管道.
下图说明了 ML 工作流的高级架构,其中包含使用新功能训练和生成推理的不同步骤。
该管道包括以下步骤:
- 预处理数据以构建所需的特征并将数据拆分为训练、验证和测试数据集。
- 使用 SageMaker XGBoost 框架创建训练作业。
- 使用测试数据集评估经过训练的模型。
- 检查 AUC 分数是否高于预定义阈值。
- 如果 AUC 分数小于阈值,则停止管道运行并将其标记为失败。
- 如果 AUC 分数大于阈值,请创建 SageMaker 模型并将其注册到 SageMaker 模型注册表中。
- 使用在上一步中创建的模型对给定数据集应用批量转换。
先决条件
要继续阅读这篇文章,您需要一个 AWS 账户 工作室域.
Pipelines 直接与 SageMaker 实体和资源集成,因此您无需与任何其他 AWS 服务进行交互。 您也不需要管理任何资源,因为它是一项完全托管的服务,这意味着它会为您创建和管理资源。 有关作为独立 Python API 以及 Studio 集成组件的各种 SageMaker 组件的更多信息,请参阅 SageMaker 产品页面.
在开始之前,使用以下代码片段在 Studio 笔记本中安装 SageMaker SDK 版本 >= 2.104.0 和 xlrd >=1.0.0:
print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
import sagemaker
机器学习工作流程
对于这篇文章,您使用以下组件:
- 资料准备
- SageMaker处理 – SageMaker Processing 是一项完全托管的服务,允许您为 ML 工作负载运行自定义数据转换和特征工程。
- 建筑模型
- 模型训练和评估
- 一键培训 – SageMaker 分布式训练功能。 SageMaker 为数据并行和模型并行提供分布式训练库。 这些库针对 SageMaker 训练环境进行了优化,有助于使您的分布式训练作业适应 SageMaker,并提高训练速度和吞吐量。
- SageMaker实验 – 实验是 SageMaker 的一项功能,可让您组织、跟踪、比较和评估您的 ML 迭代。
- SageMaker批量转换 – 批量转换或离线评分是 SageMaker 中的一项托管服务,可让您使用 ML 模型对更大的数据集进行预测。
- 工作流程编排
SageMaker 管道是由 JSON 管道定义定义的一系列相互关联的步骤。 它使用有向无环图 (DAG) 对管道进行编码。 DAG 提供有关管道每个步骤的要求和关系的信息,其结构由步骤之间的数据依赖关系决定。 当一个步骤的输出属性作为输入传递给另一个步骤时,就会创建这些依赖关系。
下图说明了 SageMaker 管道(用于客户流失预测用例)中的不同步骤,其中步骤之间的连接由 SageMaker 根据步骤定义定义的输入和输出推断。
接下来的部分将介绍创建管道的每个步骤并在创建后运行整个管道。
项目结构
让我们从项目结构开始:
- /sm-pipelines-end-to-end-example – 项目名称
- /数据 – 数据集
- /管道 – 管道组件的代码文件
- sagemaker-pipelines-project.ipynb – 使用 Pipelines 的新功能完成建模工作流程的笔记本
下载数据集
要跟随这篇文章,您需要下载并保存 样本数据集 在项目主目录中的数据文件夹下,将文件保存在 亚马逊弹性文件系统 (Amazon EFS) 在 Studio 环境中。
构建管道组件
现在您已准备好构建管道组件。
导入语句并声明参数和常量
创建一个名为 sagemaker-pipelines-project.ipynb
在项目主目录中。 在单元格中输入以下代码块,然后运行单元格以设置 SageMaker 和 S3 客户端对象,创建 PipelineSession
,并使用 SageMaker 会话附带的默认存储桶设置 S3 存储桶位置:
import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"
管道支持参数化,它允许您在运行时指定输入参数,而无需更改管道代码。 您可以使用以下可用的模块 sagemaker.workflow.parameters
模块,例如 ParameterInteger
, ParameterFloat
及 ParameterString
, 指定各种数据类型的管道参数。 运行以下代码设置多个输入参数:
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge"
)
input_data = ParameterString(
name="InputData",
default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
生成批处理数据集
生成批处理数据集,稍后在批处理转换步骤中使用:
def preprocess_batch_data(file_path):
df = pd.read_csv(file_path)
## Convert to datetime columns
df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
## Drop Rows with null values
df = df.dropna()
## Create Column which gives the days between the last order and the first order
df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
## Create Column which gives the days between when the customer record was created and the first order
df['created'] = pd.to_datetime(df['created'])
df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
## Drop Columns
df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
## Apply one hot encoding on favday and city columns
df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
return df
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)
将数据上传到 S3 存储桶
将数据集上传到 Amazon S3:
s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")
定义处理脚本和处理步骤
在此步骤中,您准备一个 Python 脚本来执行特征工程、一个热编码,并策划用于模型构建的训练、验证和测试拆分。 运行以下代码来构建您的处理脚本:
%%writefile pipelines/customerchurn/preprocess.py
import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
base_dir = "/opt/ml/processing"
#Read Data
df = pd.read_csv(
f"{base_dir}/input/storedata_total.csv"
)
# convert created column to datetime
df["created"] = pd.to_datetime(df["created"])
#Convert firstorder and lastorder to datetime datatype
df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
#Drop Rows with Null Values
df = df.dropna()
#Create column which gives the days between the last order and the first order
df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
#Create column which gives the days between the customer record was created and the first order
df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
#Drop columns
df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
#Apply one hot encoding on favday and city columns
df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
# Split into train, validation and test datasets
y = df.pop("retained")
X_pre = df
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
np.random.shuffle(X)
# Split in Train, Test and Validation Datasets
train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
train_rows = np.shape(train)[0]
validation_rows = np.shape(validation)[0]
test_rows = np.shape(test)[0]
train = pd.DataFrame(train)
test = pd.DataFrame(test)
validation = pd.DataFrame(validation)
# Convert the label column to integer
train[0] = train[0].astype(int)
test[0] = test[0].astype(int)
validation[0] = validation[0].astype(int)
# Save the Dataframes as csv files
train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
接下来,运行以下代码块以实例化处理器和 Pipelines 步骤以运行处理脚本。 因为处理脚本是用 Pandas 编写的,所以您使用 SKLearn处理器. 管道 ProcessingStep
函数采用以下参数:处理器、原始数据集的输入 S3 位置以及用于保存已处理数据集的输出 S3 位置。
# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")
# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name="sklearn-churn-process",
role=role,
sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
destination=f"s3://{default_bucket}/output/train" ),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
destination=f"s3://{default_bucket}/output/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
destination=f"s3://{default_bucket}/output/test")
],
code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)
定义训练步骤
使用 SageMaker XGBoost 估计器和管道设置模型训练 TrainingStep
功能:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path=model_path,
role=role,
sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
)
train_args = xgb_train.fit(
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
},
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
name="ChurnModelTrain",
step_args=train_args,
)
定义评估脚本和模型评估步骤
运行以下代码块来评估训练后的模型。 该脚本封装了检查 AUC 分数是否满足指定阈值的逻辑。
%%writefile pipelines/customerchurn/evaluate.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":
#Read Model Tar File
model_path = f"/opt/ml/processing/model/model.tar.gz"
with tarfile.open(model_path) as tar:
tar.extractall(path=".")
model = pickle.load(open("xgboost-model", "rb"))
#Read Test Data using which we evaluate the model
test_path = "/opt/ml/processing/test/test.csv"
df = pd.read_csv(test_path, header=None)
y_test = df.iloc[:, 0].to_numpy()
df.drop(df.columns[0], axis=1, inplace=True)
X_test = xgboost.DMatrix(df.values)
#Run Predictions
predictions = model.predict(X_test)
#Evaluate Predictions
fpr, tpr, thresholds = roc_curve(y_test, predictions)
auc_score = auc(fpr, tpr)
report_dict = {
"classification_metrics": {
"auc_score": {
"value": auc_score,
},
},
}
#Save Evaluation Report
output_dir = "/opt/ml/processing/evaluation"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
evaluation_path = f"{output_dir}/evaluation.json"
with open(evaluation_path, "w") as f:
f.write(json.dumps(report_dict))
接下来,运行以下代码块以实例化处理器和流水线步骤以运行评估脚本。 因为评估脚本使用 XGBoost 包,所以您使用 ScriptProcessor
连同 XGBoost 图像。 管道 ProcessingStep
函数采用以下参数:处理器、原始数据集的输入 S3 位置以及用于保存已处理数据集的输出 S3 位置。
#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=processing_instance_type,
instance_count=1,
base_job_name="script-churn-eval",
role=role,
sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
inputs=[
ProcessingInput(
source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model",
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
destination=f"s3://{default_bucket}/output/evaluation"),
],
code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
name="ChurnEvalModel",
step_args=eval_args,
property_files=[evaluation_report],
)
定义创建模型步骤
运行以下代码块以使用管道模型步骤创建 SageMaker 模型。 此步骤利用训练步骤的输出来打包模型以进行部署。 请注意,实例类型参数的值是使用您在文章前面定义的 Pipelines 参数传递的。
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
step_create_model = ModelStep(
name="ChurnCreateModel",
step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)
定义批量转换步骤
运行以下代码块,使用经过训练的模型和第一步中创建的批量输入来运行批量转换:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
transformer = Transformer(
model_name=step_create_model.properties.ModelName,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=f"s3://{default_bucket}/ChurnTransform",
sagemaker_session=pipeline_session
)
step_transform = TransformStep(
name="ChurnTransform",
step_args=transformer.transform(
data=batch_data,
content_type="text/csv"
)
)
定义寄存器模型步骤
以下代码使用管道模型步骤在 SageMaker 模型注册表中注册模型:
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri="{}/evaluation.json".format(
step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
),
content_type="application/json",
)
)
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
定义一个失败步骤来停止管道
以下代码定义了 Pipelines fail 步骤,如果 AUC 分数未达到定义的阈值,则停止管道运行并显示错误消息:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
name="ChurnAUCScoreFail",
error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
)
定义一个条件步骤来检查 AUC 分数
以下代码定义了一个条件步骤来检查 AUC 分数并有条件地创建模型并运行批量转换并在模型注册表中注册模型,或者停止在失败状态下运行的管道:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
left=JsonGet(
step_name=step_eval.name,
property_file=evaluation_report,
json_path="classification_metrics.auc_score.value",
),
right=auc_score_threshold,
)
step_cond = ConditionStep(
name="CheckAUCScoreChurnEvaluation",
conditions=[cond_lte],
if_steps=[step_register, step_create_model, step_transform],
else_steps=[step_fail],
)
构建并运行管道
定义完所有组件步骤后,您可以将它们组装成一个 Pipelines 对象。 您无需指定流水线的顺序,因为流水线会根据步骤之间的依赖关系自动推断顺序。
import json
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_count,
processing_instance_type,
training_instance_type,
model_approval_status,
input_data,
batch_data,
auc_score_threshold,
],
steps=[step_process, step_train, step_eval, step_cond],
)
definition = json.loads(pipeline.definition())
print(definition)
在笔记本的单元格中运行以下代码。 如果管道已存在,则代码会更新管道。 如果管道不存在,它会创建一个新管道。
pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution
结论
在这篇文章中,我们介绍了 Pipelines 现在提供的一些新功能以及其他内置的 SageMaker 功能和 XGBoost 算法,用于开发、迭代和部署客户流失预测模型。 该解决方案可以通过其他数据源进行扩展
实现您自己的 ML 工作流程。 有关 Pipelines 工作流程中可用步骤的更多详细信息,请参阅 Amazon SageMaker 模型构建管道 和 SageMaker 工作流程。 该 AWS SageMaker 示例 GitHub 存储库有更多关于使用管道的各种用例的示例。
作者简介
彭杰瑞 是 AWS SageMaker 的软件开发工程师。 他专注于构建端到端的大规模 MLOps 系统,从培训到生产中的模型监控。 他还热衷于将 MLOps 的概念带给更广泛的受众。
德文奇 是 AWS 的一名软件开发工程师。 她目前专注于开发和改进 SageMaker Pipelines。 工作之余,她喜欢练习大提琴。
加亚特里·加纳科塔 是 AWS 专业服务的高级机器学习工程师。 她热衷于在各个领域开发、部署和解释 AI/ML 解决方案。 在担任此职务之前,她曾在金融和零售领域的全球顶级公司担任数据科学家和 ML 工程师,领导多项计划。 她拥有科罗拉多大学博尔德分校数据科学专业的计算机科学硕士学位。
鲁宾德·格鲁瓦尔 是 AWS 的高级人工智能/机器学习专家解决方案架构师。 他目前专注于在 SageMaker 上提供模型和 MLOps。 在担任此职务之前,他曾担任机器学习工程师构建和托管模型。 工作之余,他喜欢打网球和在山路上骑自行车。
李雷 是 AWS 专业服务的高级数据科学家。 他的专长专注于为从初创公司到企业组织的各种规模的客户构建和实施 AI/ML 解决方案。 工作之余,Ray 喜欢健身和旅行。