在 Amazon SageMaker Pipelines 中使用 PySpark 运行安全处理作业

在 Amazon SageMaker Pipelines 中使用 PySpark 运行安全处理作业

亚马逊SageMaker Studio 可以帮助您构建、训练、调试、部署和监控您的模型,并管理您的机器学习 (ML) 工作流程。 Amazon SageMaker管道 使您能够构建一个 安全、可扩展且灵活的 MLOps 平台 工作室内。

在这篇文章中,我们解释了如何在管道中运行 PySpark 处理作业。 这使得任何想要使用 Pipelines 训练模型的人都可以使用 PySpark 预处理训练数据、后处理推理数据或评估模型。 当您需要处理大规模数据时,此功能尤为重要。 此外,我们还展示了如何使用配置和 Spark UI 日志优化 PySpark 步骤。

管道是一个 亚马逊SageMaker 用于构建和管理端到端 ML 管道的工具。 它是一项完全托管的按需服务,与 SageMaker 和其他 AWS 服务集成,因此可以为您创建和管理资源。 这可确保仅在运行管道时配置和使用实例。 此外,管道由 SageMaker Python 开发工具包,让您追踪您的 数据沿袭重用步骤 通过缓存它们来减少开发时间和成本。 SageMaker 管道可以使用 处理步骤 处理数据或执行模型评估。

在处理大规模数据时,数据科学家和 ML 工程师经常使用 火花, 一个接口 Apache Spark 在 Python 中。 SageMaker 提供预构建的 Docker 映像,其中包括 PySpark 和运行分布式数据处理作业所需的其他依赖项,包括使用 Spark 框架的数据转换和特征工程。 尽管这些图像允许您在处理作业中快速开始使用 PySpark,但大规模数据处理通常需要特定的 Spark 配置,以优化 SageMaker 创建的集群的分布式计算。

在我们的示例中,我们创建了一个运行单个处理步骤的 SageMaker 管道。 有关可以添加到管道的其他步骤的更多信息,请参阅 流水线步骤.

SageMaker 处理库

SageMaker Processing 可以使用特定的 框架 (例如,SKlearnProcessor、PySparkProcessor 或 Hugging Face)。 独立于所使用的框架,每个 处理步骤 需要以下内容:

  • 步骤名称 – 用于 SageMaker 管道步骤的名称
  • 步骤参数 – 你的论点 ProcessingStep

此外,您还可以提供以下信息:

  • 步骤缓存的配置,以避免在 SageMaker 管道中不必要地运行您的步骤
  • 步骤名称、步骤实例或步骤集合实例的列表 ProcessingStep 依赖于取决于
  • 的显示名称 ProcessingStep
  • 的说明 ProcessingStep
  • 属性文件
  • 重试政策

论据移交给 ProcessingStep。 您可以使用 sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor 类在处理作业中运行您的 Spark 应用程序。

每个处理器都有自己的需求,具体取决于框架。 这是最好的说明使用 PySparkProcessor,您可以在其中传递其他信息以优化 ProcessingStep 进一步,例如通过 configuration 运行作业时的参数。

在安全环境中运行 SageMaker 处理作业

这是 最佳实践 创建私有 Amazon VPC 并对其进行配置,以便您的作业无法通过公共 Internet 访问。 SageMaker 处理作业允许您在 VPC 中指定私有子网和安全组,并使用 NetworkConfig.VpcConfig 的请求参数 CreateProcessingJob 应用程序接口。 我们使用以下方法提供此配置的示例 SageMaker 开发工具包 在下一节。

SageMaker 管道中的 PySpark ProcessingStep

对于此示例,我们假设您已将 Studio 部署在一个已经可用的安全环境中,包括 VPC、VPC 端点、安全组、 AWS身份和访问管理 (IAM) 角色,以及 AWS密钥管理服务 (AWS KMS) 密钥。 我们还假设您有两个桶:一个用于代码和日志等工件,另一个用于您的数据。 这 basic_infra.yaml 文件提供示例 AWS CloudFormation 提供必要的先决条件基础设施的代码。 示例代码和部署指南也可在 GitHub上.

例如,我们设置了一个包含单个 ProcessingStep 我们只是简单地阅读和编写 鲍鱼数据集 使用星火。 代码示例向您展示了如何设置和配置 ProcessingStep.

我们为管道定义参数(名称、角色、存储桶等)和特定于步骤的设置(实例类型和计数、框架版本等)。 在此示例中,我们使用安全设置并定义子网、安全组和容器间流量加密。 对于此示例,您需要一个具有 SageMaker 完全访问权限和 VPC 的管道执行角色。 请参见以下代码:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

为了进行演示,以下代码示例通过使用 PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

如前面的代码所示,我们通过提供覆盖默认的 Spark 配置 configuration.json 作为一个 ProcessingInput. 我们使用一个 configuration.json 保存在的文件 亚马逊简单存储服务 (Amazon S3) 具有以下设置:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

我们可以通过将文件作为 ProcessingInput 或者在运行时使用配置参数 run() 功能。

Spark 配置取决于其他选项,例如为处理作业选择的实例类型和实例计数。 首先要考虑的是实例数量、每个实例拥有的 vCPU 内核以及实例内存。 您可以使用 Spark 用户界面 or CloudWatch 实例指标 并记录以在多次运行迭代中校准这些值。

此外,还可以进一步优化执行程序和驱动程序设置。 有关如何计算这些的示例,请参阅 在 Amazon EMR 上成功管理 Apache Spark 应用程序内存的最佳实践.

接下来,对于驱动程序和执行程序设置,我们建议调查提交程序设置以提高写入 Amazon S3 时的性能。 在我们的例子中,我们将 Parquet 文件写入 Amazon S3 并设置“spark.sql.parquet.fs.optimized.comitter.optimization-enabled”为真。

如果需要连接到 Amazon S3,区域端点“spark.hadoop.fs.s3a.endpoint”可以在配置文件中指定。

在此示例管道中,PySpark 脚本 spark_process.py (如以下代码所示)从 Amazon S3 加载 CSV 文件到 Spark 数据帧,并将数据作为 Parquet 保存回 Amazon S3。

请注意,我们的示例配置与工作负载不成比例,因为可以在一个实例的默认设置下读取和写入鲍鱼数据集。 我们提到的配置应该根据您的具体需求来定义。

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

要深入研究优化 Spark 处理作业,您可以使用 CloudWatch 日志和 Spark UI。 您可以通过在 SageMaker 笔记本实例上运行处理作业来创建 Spark UI。 您可以查看 用于在管道中运行的处理作业的 Spark UI by 运行历史服务器 如果 Spark UI 日志保存在同一 Amazon S3 位置,则在 SageMaker 笔记本实例中。

清理

如果您按照本教程进行操作,最好删除不再使用的资源以停止产生费用。 确保 删除 CloudFormation 堆栈 你用来创建你的资源。 这将删除创建的堆栈及其创建的资源。

结论

在本文中,我们展示了如何在 SageMaker Pipelines 中使用 PySpark 运行安全的 SageMaker Processing 作业。 我们还演示了如何使用 Spark 配置优化 PySpark,以及如何设置您的处理作业以在安全网络配置中运行。

下一步,探索如何自动化整个模型生命周期以及如何 客户构建了安全且可扩展的 MLOps 平台 使用 SageMaker 服务。


作者简介

在 Amazon SageMaker Pipelines PlatoBlockchain 数据智能中使用 PySpark 运行安全处理作业。垂直搜索。人工智能。马伦·苏尔曼 是一名数据科学家 AWS专业服务. 她与各行各业的客户合作,展示 AI/ML 实现业务成果的强大功能。 Maren 自 2019 年 XNUMX 月以来一直在 AWS 工作。在业余时间,她喜欢跆拳道、远足欣赏美景和参加棋盘游戏之夜。


在 Amazon SageMaker Pipelines PlatoBlockchain 数据智能中使用 PySpark 运行安全处理作业。垂直搜索。人工智能。迈拉·拉德拉·坦克
是 AWS 的机器学习专家。 她拥有数据科学背景,在与各行各业的客户一起设计和构建 ML 应用程序方面拥有 9 年的经验。 作为技术主管,她帮助客户通过新兴技术和创新解决方案加速实现商业价值。 在空闲时间,Maira 喜欢旅行,喜欢在温暖的地方与家人共度时光。


在 Amazon SageMaker Pipelines PlatoBlockchain 数据智能中使用 PySpark 运行安全处理作业。垂直搜索。人工智能。丁宝琳
是数据科学家 AWS专业服务 团队。 她通过开发 AI/ML 解决方案支持客户实现并加速他们的业务成果。 在业余时间,Pauline 喜欢旅行、冲浪和尝试新的甜品店。


在 Amazon SageMaker Pipelines PlatoBlockchain 数据智能中使用 PySpark 运行安全处理作业。垂直搜索。人工智能。唐纳德福索
是一名高级数据架构师 AWS专业服务 团队,主要与 Global Finance Service 合作。 他与客户合作创建创新的解决方案来解决客户的业务问题并加速 AWS 服务的采用。 在业余时间,唐纳德喜欢阅读、跑步和旅行。

时间戳记:

更多来自 AWS机器学习