Amazon SageMaker Pipelines で PySpark を使用して安全な処理ジョブを実行する

Amazon SageMaker Pipelines で PySpark を使用して安全な処理ジョブを実行する

Amazon SageMakerスタジオ モデルの構築、トレーニング、デバッグ、デプロイ、監視、および機械学習 (ML) ワークフローの管理に役立ちます。 AmazonSageMakerパイプライン を構築することができます 安全でスケーラブルで柔軟な MLOps プラットフォーム スタジオ内。

この投稿では、パイプライン内で PySpark 処理ジョブを実行する方法について説明します。 これにより、パイプラインを使用してモデルをトレーニングしたい人は誰でも、トレーニング データを前処理したり、推論データを後処理したり、PySpark を使用してモデルを評価したりできます。 この機能は、大規模なデータを処理する必要がある場合に特に役立ちます。 さらに、構成と Spark UI ログを使用して PySpark のステップを最適化する方法を紹介します。

パイプラインは アマゾンセージメーカー エンドツーエンドの ML パイプラインを構築および管理するためのツール。 これは、SageMaker やその他の AWS サービスと統合された完全マネージド型のオンデマンド サービスであるため、リソースを作成して管理します。 これにより、パイプラインの実行時にのみインスタンスがプロビジョニングされ、使用されるようになります。 さらに、パイプラインは SageMaker Python SDK、追跡できるようにします データ系統 および ステップを再利用する それらをキャッシュして、開発時間とコストを軽減します。 SageMaker パイプラインは使用できます 処理手順 データを処理したり、モデルの評価を実行したりします。

大規模なデータを処理する場合、データ サイエンティストや ML エンジニアはよく パイスパーク、のインターフェース Apache Spark パイソンで。 SageMaker は、Spark フレームワークを使用したデータ変換や機能エンジニアリングなど、分散データ処理ジョブを実行するために必要な PySpark やその他の依存関係を含むビルド済みの Docker イメージを提供します。 これらのイメージを使用すると、ジョブの処理で PySpark の使用をすぐに開始できますが、大規模なデータ処理では、SageMaker によって作成されたクラスターの分散コンピューティングを最適化するために、特定の Spark 構成が必要になることがよくあります。

この例では、単一の処理ステップを実行する SageMaker パイプラインを作成します。 パイプラインに追加できるその他のステップの詳細については、次を参照してください。 パイプラインの手順.

SageMaker 処理ライブラリ

SageMaker Processing は特定の環境で実行できます フレームワーク (たとえば、SKlearnProcessor、PySparkProcessor、Hugging Face など)。 使用されるフレームワークに関係なく、それぞれ 処理ステップ 以下が必要です。

  • ステップ名 – SageMaker パイプラインステップに使用する名前
  • ステップ引数 – あなたの議論 ProcessingStep

さらに、次のものを提供できます。

  • SageMaker パイプラインで不要なステップの実行を回避するためのステップ キャッシュの設定
  • ステップ名、ステップ インスタンス、またはステップ コレクション インスタンスのリスト。 ProcessingStep に依存します
  • の表示名 ProcessingStep
  • の説明 ProcessingStep
  • プロパティファイル
  • ポリシーの再試行

引数はに渡されます ProcessingStep。 あなたが使用することができます sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor クラスを使用して、処理ジョブ内で Spark アプリケーションを実行します。

各プロセッサには、フレームワークに応じて独自のニーズがあります。 これは、 PySparkProcessorを最適化するための追加情報を渡すことができます。 ProcessingStep さらに、たとえば configuration ジョブを実行するときのパラメーター。

安全な環境で SageMaker Processing ジョブを実行する

信じられないほどシンプル ベストプラクティス プライベート Amazon VPC を作成し、パブリック インターネット経由でジョブにアクセスできないように設定します。 SageMaker Processing ジョブを使用すると、VPC でプライベートサブネットとセキュリティグループを指定できるほか、 NetworkConfig.VpcConfig のリクエストパラメータ CreateProcessingJob API。 を使用して、この構成の例を示します。 SageMaker SDK 次のセクションで説明します。

SageMaker パイプライン内の PySpark ProcessingStep

この例では、VPC、VPC エンドポイント、セキュリティ グループ、 AWS IDおよびアクセス管理 (IAM) ロール、および AWSキー管理サービス (AWS KMS) キー。 また、XNUMX つのバケットがあると仮定します。XNUMX つはコードやログなどのアーティファクト用で、もう XNUMX つはデータ用です。 の Basic_infra.yaml ファイルは例を提供します AWS CloudFormation 必要な前提インフラストラクチャをプロビジョニングするためのコード。 サンプル コードと導入ガイドは、次のサイトでも入手できます。 GitHubの.

例として、単一のパイプラインを含むパイプラインをセットアップします。 ProcessingStep 単に読み書きしているだけです アワビのデータセット スパークを使用。 コード サンプルは、 ProcessingStep.

パイプラインのパラメーター (名前、ロール、バケットなど) とステップ固有の設定 (インスタンスの種類と数、フレームワークのバージョンなど) を定義します。 この例では、安全なセットアップを使用し、サブネット、セキュリティ グループ、およびコンテナー間トラフィックの暗号化も定義します。 この例では、SageMaker フルアクセスと VPC を持つパイプライン実行ロールが必要です。 次のコードを参照してください。

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

実証するために、次のコード例では、パイプライン内の SageMaker Processing で PySpark スクリプトを実行します。 PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

前のコードに示されているように、提供することでデフォルトの Spark 構成を上書きしています。 configuration.json として ProcessingInput. 私たちは configuration.json に保存されたファイル Amazon シンプル ストレージ サービス (Amazon S3) 以下の設定:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

ファイルを ProcessingInput または、実行時に構成引数を使用して run() 機能。

Spark 構成は、処理ジョブ用に選択されたインスタンス タイプやインスタンス数など、他のオプションに依存します。 最初の考慮事項は、インスタンスの数、それらの各インスタンスが持つ vCPU コア、およびインスタンス メモリです。 使用できます Spark UI or CloudWatch インスタンスのメトリクス およびログを記録して、複数回の実行反復でこれらの値を調整します。

さらに、エグゼキュータとドライバの設定をさらに最適化できます。 これらの計算方法の例については、次を参照してください。 Amazon EMR で Apache Spark アプリケーションのメモリを適切に管理するためのベストプラクティス.

次に、ドライバーとエグゼキューターの設定については、コミッターの設定を調査して、Amazon S3 への書き込み時のパフォーマンスを向上させることをお勧めします。 私たちの場合、Parquet ファイルを Amazon S3 に書き込み、「spark.sql.parquet.fs.optimized.comitter.optimization-enabled」からtrueに。

Amazon S3 への接続に必要な場合は、リージョン エンドポイント「spark.hadoop.fs.s3a.endpoint」は、構成ファイル内で指定できます。

このパイプラインの例では、PySpark スクリプト spark_process.py (次のコードに示すように) CSV ファイルを Amazon S3 から Spark データ フレームにロードし、データを Parquet として Amazon S3 に保存します。

アワビ データセットの読み取りと書き込みは XNUMX つのインスタンスのデフォルト設定で実行できるため、構成例はワークロードに比例していないことに注意してください。 前述の構成は、特定のニーズに基づいて定義する必要があります。

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Spark 処理ジョブの最適化に飛び込むには、CloudWatch ログと Spark UI を使用できます。 SageMaker ノートブック インスタンスで処理ジョブを実行することにより、Spark UI を作成できます。 あなたは見ることができます パイプライン内で実行される処理ジョブの Spark UI by 履歴サーバーの実行 Spark UI ログが同じ Amazon S3 の場所に保存されている場合は、SageMaker ノートブック インスタンス内。

クリーンアップ

チュートリアルに従った場合は、使用されなくなったリソースを削除して、課金を停止することをお勧めします。 必ず CloudFormationスタックを削除します リソースの作成に使用したもの。 これにより、作成されたスタックとそれによって作成されたリソースが削除されます。

まとめ

この投稿では、SageMaker Pipelines 内で PySpark を使用して安全な SageMaker Processing ジョブを実行する方法を示しました。 また、Spark 構成を使用して PySpark を最適化し、安全なネットワーク構成で実行するように処理ジョブを設定する方法も示しました。

次のステップとして、モデルのライフサイクル全体を自動化する方法と、その方法を探ります。 お客様は、安全でスケーラブルな MLOps プラットフォームを構築しました SageMaker サービスの使用。


著者について

Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.マレン・スイルマン のデータサイエンティストです AWSプロフェッショナルサービス. 彼女はさまざまな業界の顧客と協力して、ビジネスの成果を達成するための AI/ML の力を明らかにしています。 Maren は 2019 年 XNUMX 月から AWS に勤務しています。余暇には、キックボクシング、素晴らしい景色へのハイキング、ボードゲームナイトを楽しんでいます。


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.マイラ・ラデイラ・タンケ
AWS の ML スペシャリストです。 データ サイエンスのバックグラウンドを持つ彼女は、さまざまな業界の顧客と共に ML アプリケーションの設計と構築に 9 年間携わってきました。 彼女はテクニカル リーダーとして、新しいテクノロジと革新的なソリューションを通じて、顧客がビジネス価値の達成を加速するのを支援しています。 マイラは自由な時間に旅行を楽しんだり、暖かい場所で家族と過ごしたりしています。


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.ポーリン・ティン
のデータサイエンティストです AWSプロフェッショナルサービス チーム。 彼女は、AI/ML ソリューションを開発することにより、顧客がビジネス成果を達成し、加速するのをサポートしています。 余暇には、旅行、サーフィン、新しいデザートの店への挑戦を楽しんでいます。


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.ドナルド・フォスーオ
のシニア データ アーキテクトです。 AWSプロフェッショナルサービス 主に Global Finance Service と連携しています。 彼は顧客と協力して、顧客のビジネス上の問題に対処し、AWS サービスの採用を加速する革新的なソリューションを作成しています。 余暇には、ドナルドは読書、ランニング、旅行を楽しんでいます。

タイムスタンプ:

より多くの AWS機械学習