使用 Amazon SageMaker 编排基于 Ray 的机器学习工作流程 | 亚马逊网络服务

使用 Amazon SageMaker 编排基于 Ray 的机器学习工作流程 | 亚马逊网络服务

随着客户试图解决越来越具有挑战性的问题,机器学习 (ML) 变得越来越复杂。 这种复杂性通常导致需要分布式机器学习,其中使用多台机器来训练单个模型。 尽管这使得跨多个节点的任务并行化成为可能,从而缩短训练时间、增强可扩展性并提高性能,但有效使用分布式硬件仍面临重大挑战。 数据科学家必须解决数据分区、负载平衡、容错和可扩展性等挑战。 机器学习工程师必须手动处理并行化、调度、故障和重试,需要复杂的基础设施代码。

在这篇文章中,我们讨论使用的好处 射线亚马逊SageMaker 分布式机器学习,并提供有关如何使用这些框架构建和部署可扩展机器学习工作流程的分步指南。

Ray 是一个开源分布式计算框架,为 ML 模型的分布式训练和服务提供了灵活的框架。 它通过简单、可扩展的库抽象出低级分布式系统细节,用于常见的 ML 任务,例如数据预处理、分布式训练、超参数调整、强化学习和模型服务。

SageMaker 是一项完全托管的服务,用于构建、训练和部署 ML 模型。 Ray 与 SageMaker 功能无缝集成,可构建和部署高效可靠的复杂 ML 工作负载。 Ray 和 SageMaker 的组合为可扩展的 ML 工作流程提供了端到端功能,并具有以下突出功能:

  • Ray 中的分布式参与者和并行结构简化了分布式应用程序的开发。
  • Ray AI Runtime (AIR) 减少了从开发到生产的摩擦。 借助 Ray 和 AIR,相同的 Python 代码可以从笔记本电脑无缝扩展到大型集群。
  • SageMaker 的托管基础​​设施以及处理作业、训练作业和超参数调整作业等功能可以使用底层的 Ray 库进行分布式计算。
  • Amazon SageMaker实验 允许快速迭代和跟踪试验。
  • Amazon SageMaker功能商店 提供可扩展的存储库,用于存储、检索和共享模型训练的 ML 功能。
  • 训练后的模型可以存储、版本化和跟踪 Amazon SageMaker 模型注册表 用于治理和管理。
  • Amazon SageMaker管道 允许将从数据准备和训练到模型部署的端到端机器学习生命周期编排为自动化工作流程。

解决方案概述

本文重点介绍结合使用 Ray 和 SageMaker 的好处。 我们建立了一个基于 Ray 的端到端 ML 工作流程,并使用 SageMaker Pipelines 进行编排。 该工作流程包括使用 Ray actor 将数据并行摄取到特征存储中、使用 Ray Data 进行数据预处理、使用 Ray Train 和超参数优化 (HPO) 调整作业大规模训练模型和超参数调整,最后进行模型评估并将模型注册到模型中。模型注册表。

对于我们的数据,我们使用 合成住房数据集 由八个特征组成(YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCHDECK),我们的模型将预测 PRICE 在这所房子里面。

机器学习工作流程中的每个阶段都分为离散的步骤,并有自己的脚本接受输入和输出参数。 在下一节中,我们将重点介绍每个步骤的关键代码片段。 完整的代码可以在 aws-samples-for-ray GitHub 存储库.

先决条件

要使用 SageMaker Python SDK 并运行与本文相关的代码,您需要满足以下先决条件:

将数据提取到 SageMaker Feature Store 中

ML 工作流程的第一步是读取源数据文件 亚马逊简单存储服务 (Amazon S3) 以 CSV 格式并将其提取到 SageMaker Feature Store 中。 SageMaker Feature Store 是一个专门构建的存储库,使团队可以轻松创建、共享和管理 ML 功能。 它简化了功能发现、重用和共享,从而加快开发速度、增强客户团队内部的协作并降低成本。

将特征提取到特征存储中包含以下步骤:

  1. 定义特征组并在特征存储中创建特征组。
  2. 通过为每行数据添加事件时间和记录 ID 来准备要素存储的源数据。
  3. 使用Boto3 SDK将准备好的数据提取到功能组中。

在本节中,我们只重点介绍步骤 3,因为这是涉及使用 Ray 并行处理摄取任务的部分。 您可以在中查看此过程的完整代码 GitHub回购.

摄取特征 方法是在一个名为的类中定义的 Featurestore。 请注意 Featurestore 类装饰有 @ray.remote。 这表明此类的实例是 Ray actor,Ray 中的有状态并发计算单元。 它是一种编程模型,允许您创建维护内部状态的分布式对象,并且可以由 Ray 集群中不同节点上运行的多个任务同时访问。 Actor 提供了一种管理和封装可变状态的方法,这使得它们对于在分布式环境中构建复杂的有状态应用程序很有价值。 您也可以在参与者中指定资源需求。 在这种情况下,每个实例 FeatureStore 类将需要 0.5 个 CPU。 请看下面的代码:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

您可以通过调用与演员互动 remote 操作员。 在以下代码中,所需的参与者数量作为输入参数传递给脚本。 然后,数据根据参与者的数量进行分区,并传递到远程并行进程以摄取到特征存储中。 您可以致电 get 在对象 ref 上阻止当前任务的执行,直到远程计算完成并且结果可用。 当结果出来时, ray.get 将返回结果,并且当前任务将继续执行。

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

准备用于训练、验证和测试的数据

在此步骤中,我们使用 Ray Dataset 有效地分割、转换和缩放数据集,为机器学习做好准备。 Ray Dataset 提供了将分布式数据加载到 Ray 中的标准方式,支持各种存储系统和文件格式。 它具有用于常见 ML 数据预处理操作的 API,例如并行转换、混洗、分组和聚合。 Ray Dataset 还处理需要状态设置和 GPU 加速的操作。 它与 Spark、Pandas、NumPy 等其他数据处理库以及 TensorFlow 和 PyTorch 等 ML 框架顺利集成。 这允许在 Ray 之上构建端到端数据管道和机器学习工作流程。 目标是让从业者和研究人员更轻松地进行分布式数据处理和机器学习。

让我们看一下执行此数据预处理的脚本部分。 我们首先从特征存储中加载数据:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

然后,我们使用可用的更高级别的抽象来分割和缩放数据 ray.data 图书馆:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

处理后的训练、验证和测试数据集存储在 Amazon S3 中,并将作为输入参数传递到后续步骤。

执行模型训练和超参数优化

数据经过预处理并准备好建模后,就可以训练一些 ML 模型并微调其超参数,以最大限度地提高预测性能。 我们用 XGBoost-射线,基于 Ray 构建的 XGBoost 分布式后端,支持使用多个节点和 GPU 在大型数据集上训练 XGBoost 模型。 它为 XGBoost 的训练和预测 API 提供简单的直接替代品,同时处理分布式数据管理和训练的复杂性。

为了能够在多个节点上分配训练,我们使用一个名为的帮助器类 雷助手。 如下代码所示,我们使用训练作业的资源配置,选择第一台主机作为头节点:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

我们可以使用主机信息来决定如何在每个训练作业实例上初始化 Ray:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

当训练作业开始时,可​​以通过调用以下函数来初始化 Ray 集群: start_ray() 实例的方法 RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

然后我们使用 XGBoost-Ray 的 XGBoost 训练器进行训练:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

请注意,在实例化时 trainer,我们通过 RayParams,它获取参与者数量和每个参与者的 CPU 数量。 XGBoost-Ray 使用此信息在连接到 Ray 集群的所有节点上分配训练。

我们现在基于 SageMaker Python SDK 创建一个 XGBoost 估计器对象,并将其用于 HPO 作业。

使用 SageMaker Pipelines 编排上述步骤

为了构建端到端可扩展且可重用的机器学习工作流程,我们需要使用 CI/CD 工具将上述步骤编排到管道中。 SageMaker Pipelines 与 SageMaker、SageMaker Python SDK 和 SageMaker Studio 直接集成。 通过这种集成,您可以使用易于使用的 Python SDK 创建机器学习工作流程,然后使用 SageMaker Studio 可视化和管理您的工作流程。 您还可以跟踪管道执行中的数据历史记录并指定缓存步骤。

SageMaker Pipelines 创建有向非循环图 (DAG),其中包括构建 ML 工作流程所需的步骤。 每个管道都是由步骤之间的数据依赖性编排的一系列互连步骤,并且可以参数化,允许您提供输入变量作为管道每次运行的参数。 SageMaker Pipelines 有四种类型的管道参数: ParameterString, ParameterInteger, ParameterFloatParameterBoolean。 在本节中,我们参数化一些输入变量并设置步骤缓存配置:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

我们定义两个处理步骤:一个用于 SageMaker Feature Store 摄取,另一个用于数据准备。 这看起来应该与前面描述的步骤非常相似。 唯一的新代码行是 ProcessingStep 在步骤定义之后,这允许我们采用处理作业配置并将其包含为管道步骤。 我们进一步指定数据准备步骤对 SageMaker Feature Store 摄取步骤的依赖性。 请看下面的代码:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

同样,要构建模型训练和调整步骤,我们需要添加以下定义 TuningStep 在模型训练步骤的代码之后,允许我们将 SageMaker 超参数调整作为管道中的一个步骤来运行:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

调整步骤完成后,我们选择将最佳模型注册到 SageMaker 模型注册表中。 为了控制模型质量,我们实现了一个最低质量门,它将最佳模型的目标指标 (RMSE) 与定义为管道输入参数的阈值进行比较 rmse_threshold。 为了进行此评估,我们创建另一个处理步骤来运行 评估脚本。 模型评估结果将存储为属性文件。 在分析处理步骤的结果以决定如何运行其他步骤时,属性文件特别有用。 请看下面的代码:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

我们定义一个 ModelStep 将最佳模型注册到我们管道中的 SageMaker 模型注册表中。 如果最佳模型未通过我们预定的质量检查,我们还会指定一个 FailStep 输出错误消息:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

接下来,我们使用一个 ConditionStep 评估下一步是否应采取模型注册步骤或失败步骤。 在我们的例子中,如果 RMSE 分数低于阈值,则最佳模型将被注册。

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

最后,我们将所有定义的步骤编排到管道中:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

上述管道可以直接在SageMaker Studio中可视化并执行,也可以通过调用来执行 execution = training_pipeline.start()。 下图说明了管道流程。

SageMaker 管道 DAG

此外,我们可以查看管道执行生成的工件的沿袭。

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

部署模型

通过管道运行在 SageMaker 模型注册表中注册最佳模型后,我们使用 SageMaker 的完全托管模型部署功能将模型部署到实时端点。 SageMaker 还有其他模型部署选项来满足不同用例的需求。 详情请参阅 部署模型进行推理 在为您的用例选择正确的选项时。 首先,让我们在 SageMaker 模型注册表中注册模型:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

该模型的当前状态是 PendingApproval。 我们需要将其状态设置为 Approved 部署之前:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

清理

实验完成后,记得清理资源,避免产生不必要的费用。 要进行清理,请通过调用 API 删除实时端点、模型组、管道和功能组 删除端点, 删除模型包组, 删除管道删除要素组分别,并关闭所有 SageMaker Studio 笔记本实例。

结论

本文演示了如何使用 SageMaker Pipelines 编排基于 Ray 的 ML 工作流程的分步演练。 我们还演示了 SageMaker Pipelines 与第三方 ML 工具集成的功能。 有多种 AWS 服务以可扩展且安全的方式支持 Ray 工作负载,以确保卓越的性能和运营效率。 现在,轮到您探索这些强大的功能并开始使用 Amazon SageMaker Pipelines 和 Ray 优化您的机器学习工作流程。 立即采取行动,释放您的机器学习项目的全部潜力!


关于作者

使用 Amazon SageMaker 编排基于 Ray 的机器学习工作流程 | 亚马逊网络服务柏拉图区块链数据智能。 垂直搜索。 人工智能。拉朱·兰甘 是 Amazon Web Services (AWS) 的高级解决方案架构师。 他与政府资助的实体合作,帮助他们使用 AWS 构建 AI/ML 解决方案。 当不修改云解决方案时,您会看到他与家人一起出去玩,或者在与朋友进行一场热闹的羽毛球比赛中抓到小鸟。

使用 Amazon SageMaker 编排基于 Ray 的机器学习工作流程 | 亚马逊网络服务柏拉图区块链数据智能。 垂直搜索。 人工智能。丁雪莉 是 Amazon Web Services (AWS) 的高级 AI/ML 专家解决方案架构师。 她在机器学习方面拥有丰富的经验,并拥有计算机科学博士学位。 她主要与公共部门客户合作解决各种与 AI/ML 相关的业务挑战,帮助他们加快 AWS 云上的机器学习之旅。 当不帮助顾客时,她喜欢户外活动。

时间戳记:

更多来自 AWS机器学习