使用 Amazon SageMaker Pipelines 构建机器学习工作流程的最佳实践和设计模式 | 亚马逊网络服务

使用 Amazon SageMaker Pipelines 构建机器学习工作流程的最佳实践和设计模式 | 亚马逊网络服务

Amazon SageMaker管道 是一项完全托管的 AWS 服务,用于构建和编排机器学习 (ML) 工作流程。 SageMaker Pipelines 使 ML 应用程序开发人员能够编排 ML 工作流程的不同步骤,包括数据加载、数据转换、训练、调整和部署。 您可以使用 SageMaker Pipelines 在 SageMaker 中编排 ML 作业及其 与更大的 AWS 生态系统集成 还允许您使用诸如 AWS Lambda 功能, 亚马逊电子病历 工作等等。 这使您能够针对 ML 工作流程中的特定要求构建自定义且可重复的管道。

在这篇文章中,我们提供了一些最佳实践,以最大限度地发挥 SageMaker Pipelines 的价值并提供无缝的开发体验。 我们还讨论了构建 SageMaker Pipelines 时的一些常见设计场景和模式,并提供了解决这些问题的示例。

SageMaker 管道的最佳实践

在本节中,我们将讨论使用 SageMaker Pipelines 设计工作流程时可以遵循的一些最佳实践。 采用它们可以改进开发流程并简化 SageMaker Pipelines 的运营管理。

使用 Pipeline Session 延迟加载管道

管道会议 启用管道资源的延迟初始化(直到管道运行时才会启动作业)。 这 PipelineSession 上下文继承 SageMaker 会议 并实现与其他 SageMaker 实体和资源交互的便捷方法,例如训练作业、端点、输入数据集 亚马逊简单存储服务 (亚马逊 S3)等等。 定义 SageMaker Pipelines 时,您应该使用 PipelineSession 在常规 SageMaker 会话中:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

在本地模式下运行管道,以在开发过程中实现经济高效且快速的迭代

您可以运行 本地模式下的管道 使用 LocalPipelineSession 语境。 在此模式下,管道和作业使用本地计算机上的资源(而不是 SageMaker 托管资源)在本地运行。 本地模式提供了一种经济高效的方法,可以使用较小的数据子集迭代管道代码。 管道在本地测试后,可以扩展以使用 管道会话 上下文。

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

通过版本控制管理 SageMaker 管道

工件和管道定义的版本控制是开发生命周期中的常见要求。 您可以通过使用唯一的前缀或后缀(最常见的是时间戳)命名管道对象来创建管道的多个版本,如以下代码所示:

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

通过与 SageMaker Experiments 集成来组织和跟踪 SageMaker 管道运行

SageMaker Pipelines 可以轻松集成 SageMaker实验 用于组织和 跟踪管道运行情况。 这是通过指定来实现的 管道实验配置 在创建时 管道对象。 使用此配置对象,您可以指定实验名称和试验名称。 SageMaker 管道的运行详细信息根据指定的实验和试验进行组织。 如果您没有显式指定实验名称,则实验名称将使用管道名称。 同样,如果您没有显式指定试验名称,则管道运行 ID 将用作试验或运行组名称。 请看下面的代码:

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

在私有 VPC 中安全运行 SageMaker 管道

为了保护 ML 工作负载,最佳实践是在私有 VPC、私有子网和安全组内的安全网络配置中部署由 SageMaker Pipelines 编排的作业。 为了确保并强制使用此安全环境,您可以实施以下操作 AWS身份和访问管理 (IAM) 政策 SageMaker 执行角色 (这是管道在运行期间承担的角色)。 您还可以添加策略以在网络隔离模式下运行 SageMaker Pipelines 编排的作业。

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

有关已实施这些安全控制的管道实施示例,请参阅 在安全的环境中使用 Amazon SageMaker 编排作业、模型注册和持续部署.

使用标签监控管道运行的成本

单独使用 SageMaker 管道是免费的; 您需要为作为处理、训练和批量推理等各个管道步骤的一部分而启动的计算和存储资源付费。 要汇总每个管道运行的成本,您可以包括 标签 在创建资源的每个管道步骤中。 然后可以在成本浏览器中引用这些标签来过滤和聚合管道运行总成本,如下例所示:

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

从成本浏览器中,您现在可以获取按标签过滤的成本:

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

一些常见场景的设计模式

在本节中,我们将讨论 SageMaker Pipelines 的一些常见用例的设计模式。

使用 Lambda 步骤运行轻量级 Python 函数

Python 函数在 ML 工作流程中无处不在; 它们用于预处理、后处理、评估等。 Lambda 是一种无服务器计算服务,让您无需配置或管理服务器即可运行代码。 借助 Lambda,您可以使用您的首选语言(包括 Python)运行代码。 您可以使用它来运行自定义 Python 代码作为管道的一部分。 Lambda 步 使您能够将 Lambda 函数作为 SageMaker 管道的一部分运行。 从以下代码开始:

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

使用创建 Lambda 函数 SageMaker Python SDK 的 Lambda 帮助程序:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

调用 Lambda 步骤:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

在步骤之间传递数据

管道步骤的输入数据可以是可访问的数据位置,也可以是管道中先前步骤之一生成的数据。 您可以将此信息作为 ProcessingInput 范围。 让我们看一下如何使用ProcessingInput 的几个场景。

场景 1:将 Lambda 步骤的输出(原始数据类型)传递到处理步骤

原始数据类型是指标量数据类型,例如字符串、整数、布尔值和浮点数。

以下代码片段定义了一个 Lambda 函数,该函数返回具有原始数据类型的变量字典。 当从 SageMaker 管道中的 Lambda 步骤调用时,您的 Lambda 函数代码将返回键值对的 JSON。

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

在管道定义中,您可以定义特定数据类型的 SageMaker 管道参数,并将变量设置为 Lambda 函数的输出:

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

场景 2:将 Lambda 步骤的输出(非原始数据类型)传递到处理步骤

非原始数据类型是指非标量数据类型(例如, NamedTuple)。 您可能会遇到必须从 Lambda 函数返回非原始数据类型的情况。 为此,您必须将非原始数据类型转换为字符串:

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

然后,您可以使用该字符串作为管道中后续步骤的输入。 要在代码中使用命名元组,请使用 eval() 解析字符串中的Python表达式:

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

场景 3:通过属性文件传递步骤的输出

您还可以将处理步骤的输出存储在 属性 JSON 文件 用于下游消费 ConditionStep 或其他 ProcessingStep。 您可以使用 JSONGet函数 查询一个 属性文件。 请参见以下代码:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

我们假设属性文件的内容如下:

{ "hyperparam": { "eta": { "value": 0.6 } }
}

在这种情况下,可以使用 JsonGet 函数查询特定值并在后续步骤中使用:

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

在管道定义中参数化变量

通常需要对变量进行参数化,以便可以在运行时使用它们,例如构造 S3 URI。 您可以参数化一个字符串,以便在运行时使用 Join 功能。 下面的代码片段展示了如何使用 Join 函数并使用它来设置处理步骤中的输出位置:

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

在可迭代对象上运行并行代码

一些 ML 工作流程在一组静态项目上并行 for 循环运行代码( 可迭代的)。 它可以是在不同数据上运行的相同代码,也可以是需要为每个项目运行的不同代码段。 例如,如果文件中有大量行并且想要加快处理时间,则可以依赖前一种模式。 如果要对数据中的特定子组执行不同的转换,您可能必须为数据中的每个子组运行不同的代码段。 以下两个场景说明了如何为此目的设计 SageMaker 管道。

场景一:对数据的不同部分实现处理逻辑

您可以使用多个实例运行处理作业(通过设置 instance_count 为大于 1 的值)。 这会将来自 Amazon S3 的输入数据分发到所有处理实例中。 然后,您可以使用脚本 (process.py) 根据实例编号和项目列表中的相应元素处理数据的特定部分。 process.py 中的编程逻辑可以编写为根据其处理的项目列表运行不同的模块或代码段。 以下示例定义了可在ProcessingStep中使用的处理器:

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

场景 2:运行一系列步骤

当您有一系列需要并行运行的步骤时,您可以将每个序列定义为独立的 SageMaker 管道。 然后,可以从属于 Lambda 函数一部分的 Lambda 函数触发这些 SageMaker 管道的运行。 LambdaStep 在父管道中。 以下代码段说明了触发两个不同的 SageMaker 管道运行的场景:

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

结论

在这篇文章中,我们讨论了有效使用和维护 SageMaker 管道的一些最佳实践。 我们还提供了您在使用 SageMaker Pipelines 设计工作流程时可以采用的某些模式,无论您是在创作新管道还是从其他编排工具迁移 ML 工作流程。 要开始使用 SageMaker Pipelines 进行 ML 工作流程编排,请参阅 GitHub 上的代码示例Amazon SageMaker 模型构建管道.


作者简介

使用 Amazon SageMaker Pipelines 构建机器学习工作流程的最佳实践和设计模式 |亚马逊网络服务柏拉图区块链数据智能。垂直搜索。人工智能。皮纳克帕尼格拉希 与客户合作构建机器学习驱动的解决方案,以解决 AWS 上的战略业务问题。 当不忙于机器学习时,他会去远足、读书或观看体育比赛。

使用 Amazon SageMaker Pipelines 构建机器学习工作流程的最佳实践和设计模式 |亚马逊网络服务柏拉图区块链数据智能。垂直搜索。人工智能。Meenakshisundaram 坦达瓦拉扬 作为 AI/ML 专家在 AWS 工作。 他热衷于设计、创建和推广以人为本的数据和分析体验。 Meena 专注于开发可持续系统,为 AWS 的战略客户提供可衡量的竞争优势。 Meena 是一位联络人和设计思想家,致力于通过创新、孵化和民主化推动企业采用新的工作方式。

时间戳记:

更多来自 AWS机器学习