Amazon SageMaker Pipelines と Amazon SageMaker SDK PlatoBlockchain Data Intelligence の新機能。 垂直検索。 あい。

Amazon SageMaker Pipelines と Amazon SageMaker SDK の新機能

AmazonSageMakerパイプライン を使用すると、データ サイエンティストと機械学習 (ML) エンジニアはトレーニング ワークフローを自動化できます。これにより、繰り返し可能なプロセスを作成して、迅速な実験とモデルの再トレーニングのためのモデル開発ステップを調整できます。 データの準備、機能エンジニアリング、モデルのトレーニング、モデルの調整、モデルの検証など、モデル構築ワークフロー全体を自動化し、モデル レジストリにカタログ化できます。 定期的に、または特定のイベントがトリガーされたときに自動的に実行されるようにパイプラインを構成することも、必要に応じて手動で実行することもできます。

この記事では、 アマゾンセージメーカー SDK を開発し、Amazon SageMaker パイプラインの新機能を導入して、ML 実践者が ML モデルを簡単に構築およびトレーニングできるようにします。

Pipelines は開発者エクスペリエンスの革新を続けており、これらの最近のリリースでは、よりカスタマイズされた方法でサービスを使用できるようになりました。

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – ドキュメントの更新 PipelineVariable エスティメータ、プロセッサ、チューナー、トランスフォーマー、およびモデルの基本クラス、Amazon モデル、およびフレームワーク モデルの使用。 エスティメータとプロセッサのすべてのサブクラスをサポートするために、SDK の新しいバージョンでは追加の変更が行われる予定です。
  • 2.90.0 – の可用性 モデルステップ 統合モデル リソースの作成および登録タスク用。
  • 2.88.2 – の可用性 パイプライン セッション SageMaker エンティティおよびリソースとの管理された対話用。
  • 2.88.2 – サブクラスの互換性 ワークフロー パイプラインのジョブ ステップ そのため、パイプラインを使用しない場合と同様に、ジョブの抽象化を構築し、処理、トレーニング、変換、およびチューニングのジョブを構成および実行できます。
  • 2.76.0 – の可用性 フェイルステップ 失敗ステータスのパイプラインを条件付きで停止します。

この投稿では、パイプラインの新機能を実装する方法を示すために、モデルの構築と展開に焦点を当てたサンプル データセットを使用してワークフローを説明します。 最終的に、これらの新しい機能を正常に使用し、ML ワークロードを簡素化するのに十分な情報が得られます。

機能概要

Pipelines は、次の新機能を提供します。

  • パイプライン変数の注釈 – 特定のメソッド パラメータは、複数の入力タイプを受け入れます。 PipelineVariables、および追加のドキュメントが追加され、どこにあるかが明確になりました PipelineVariables SageMaker SDK ドキュメントの最新の安定バージョンと関数の init シグネチャの両方でサポートされています。 たとえば、次の TensorFlow estimator では、init シグネチャが次のことを示しています。 model_dir & image_uri サポート PipelineVariables、他のパラメーターはそうではありません。 詳細については、次を参照してください。 TensorFlow エスティメータ.
    • 前:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • 後:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • パイプライン セッションパイプライン セッション は、SageMaker SDK 全体に統一性をもたらすために導入された新しい概念であり、パイプライン リソースの遅延初期化を導入します (実行呼び出しはキャプチャされますが、パイプラインが作成されて実行されるまで実行されません)。 の PipelineSession コンテキストは SageMakerSession また、トレーニング ジョブ、エンドポイント、および Amazon シンプル ストレージ サービス (Amazon S3)。
  • ワークフロー パイプライン ジョブ ステップとのサブクラスの互換性 – パイプラインを使用しない場合と同様に、ジョブの抽象化を構築し、処理、トレーニング、変換、およびチューニングのジョブを構成および実行できるようになりました。
    • たとえば、次の処理ステップを作成します。 SKLearnProcessor 以前は以下が必要でした:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • 前のコードでわかるように、 ProcessingStep 基本的に同じ前処理ロジックを行う必要があります .run、ジョブを開始するための API 呼び出しを開始しないだけです。 しかし、ワークフロー パイプラインのジョブ ステップでサブクラスの互換性が有効になったため、 step_args .run を使用して前処理ロジックを受け取る引数を使用すると、ジョブの抽象化を構築して、パイプラインなしで使用する場合と同じように構成できます。 また、 pipeline_session、ある PipelineSession 代わりにオブジェクト sagemaker_session 実行呼び出しがキャプチャされていることを確認しますが、パイプラインが作成されて実行されるまで呼び出されません。 次のコードを参照してください。
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • モデル ステップ (モデルの作成と登録のステップによる合理化されたアプローチ) –パイプラインは、SageMaker モデルと統合するための XNUMX つのステップタイプを提供します。 CreateModelStep & RegisterModel. のみを使用して両方を達成できるようになりました ModelStep タイプ。 注意してください PipelineSession これを達成するために必要です。 これにより、パイプライン ステップと SDK の間に類似性がもたらされます。
    • 前:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • 後:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • 失敗ステップ (パイプライン実行の条件付き停止)FailStep モデル スコアが特定のしきい値を下回った場合など、条件が満たされた場合にパイプラインを失敗ステータスで停止できます。

ソリューションの概要

このソリューションでは、エントリ ポイントは Amazon SageMakerスタジオ 迅速な実験のための統合開発環境 (IDE)。 Studio は、エンド ツー エンドのパイプライン エクスペリエンスを管理するための環境を提供します。 Studio では、 AWSマネジメントコンソール ワークフロー全体の管理に。 Studio 内からパイプラインを管理する方法の詳細については、次を参照してください。 SageMaker StudioでSageMakerパイプラインを表示、追跡、実行する.

次の図は、新しい機能を使用して推論をトレーニングおよび生成するためのさまざまな手順を含む ML ワークフローの高レベル アーキテクチャを示しています。

パイプラインには次の手順が含まれます。

  1. データを前処理して必要な機能を構築し、データをトレーニング、検証、およびテスト データセットに分割します。
  2. SageMaker XGBoost フレームワークでトレーニング ジョブを作成します。
  3. テスト データセットを使用してトレーニング済みモデルを評価します。
  4. AUC スコアが事前定義されたしきい値を超えているかどうかを確認します。
    • AUC スコアがしきい値未満の場合は、パイプラインの実行を停止し、失敗としてマークします。
    • AUC スコアがしきい値より大きい場合は、SageMaker モデルを作成し、SageMaker モデル レジストリに登録します。
  5. 前の手順で作成したモデルを使用して、指定されたデータセットにバッチ変換を適用します。

前提条件

この投稿を進めるには、AWS アカウントが必要です。 スタジオ ドメイン.

Pipelines は SageMaker エンティティおよびリソースと直接統合されるため、他の AWS サービスとやり取りする必要はありません。 また、フル マネージド サービスであるため、リソースを管理する必要もありません。つまり、リソースの作成と管理が自動的に行われます。 スタンドアロン Python API と Studio の統合コンポーネントの両方であるさまざまな SageMaker コンポーネントの詳細については、 SageMaker 製品ページ.

開始する前に、次のコード スニペットを使用して Studio ノートブック内に SageMaker SDK バージョン >= 2.104.0 および xlrd >=1.0.0 をインストールします。

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML ワークフロー

この投稿では、次のコンポーネントを使用します。

  • データの準備
    • SageMakerの処理 – SageMaker Processing は、ML ワークロードのカスタム データ変換と機能エンジニアリングを実行できるフルマネージド サービスです。
  • モデル構築
  • モデルのトレーニングと評価
    • ワンクリックトレーニング – SageMaker 分散トレーニング機能。 SageMaker は、データ並列処理とモデル並列処理のための分散トレーニング ライブラリを提供します。 ライブラリは SageMaker トレーニング環境用に最適化されており、分散トレーニング ジョブを SageMaker に適応させ、トレーニングの速度とスループットを向上させます。
    • SageMakerの実験 – Experiments は、ML 反復を整理、追跡、比較、評価できる SageMaker の機能です。
    • SageMakerバッチ変換 – バッチ変換またはオフライン スコアリングは、ML モデルを使用してより大きなデータセットを予測できる SageMaker のマネージド サービスです。
  • ワークフローオーケストレーション

SageMaker パイプラインは、JSON パイプライン定義によって定義された一連の相互接続されたステップです。 有向非巡回グラフ (DAG) を使用してパイプラインをエンコードします。 DAG は、パイプラインの各ステップの要件と関係に関する情報を提供し、その構造はステップ間のデータの依存関係によって決まります。 これらの依存関係は、ステップの出力のプロパティが入力として別のステップに渡されるときに作成されます。

次の図は、ステップ定義によって定義された入力と出力に基づいて、ステップ間の接続が SageMaker によって推測される、SageMaker パイプラインのさまざまなステップ (チャーン予測のユースケース) を示しています。

次のセクションでは、パイプラインの各ステップを作成し、作成したパイプライン全体を実行する方法について説明します。

Amazon SageMaker Pipelines と Amazon SageMaker SDK PlatoBlockchain Data Intelligence の新機能。 垂直検索。 あい。

プロジェクトの構造

プロジェクト構造から始めましょう:

  • /sm-pipelines-エンドツーエンドの例 – プロジェクト名
    • /データ – データセット
    • / pipelines – パイプライン コンポーネントのコード ファイル
      • /カスタマーチャーン
        • 前処理.py
        • 評価.py
    • sagemaker-pipelines-project.ipynb – Pipelines の新機能を使用したモデリング ワークフローを説明するノートブック

データセットをダウンロードする

この投稿をフォローするには、ダウンロードして保存する必要があります サンプルデータセット プロジェクトのホーム ディレクトリ内のデータ フォルダーの下に、ファイルが保存されます。 AmazonElasticファイルシステム (Amazon EFS) Studio 環境内。

パイプライン コンポーネントを構築する

これで、パイプライン コンポーネントを構築する準備が整いました。

ステートメントをインポートし、パラメーターと定数を宣言する

という Studio ノートブックを作成します。 sagemaker-pipelines-project.ipynb プロジェクトのホーム ディレクトリ内。 次のコード ブロックをセルに入力し、セルを実行して SageMaker と S3 クライアント オブジェクトをセットアップし、作成します。 PipelineSession、SageMaker セッションに付属するデフォルトのバケットを使用して S3 バケットの場所を設定します。

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

パイプラインはパラメーター化をサポートしているため、パイプライン コードを変更せずに実行時に入力パラメーターを指定できます。 以下で利用可能なモジュールを使用できます。 sagemaker.workflow.parameters モジュールなど ParameterInteger, ParameterFloat, ParameterString、さまざまなデータ型のパイプライン パラメーターを指定します。 次のコードを実行して、複数の入力パラメーターを設定します。

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

バッチ データセットを生成する

後のバッチ変換ステップで使用するバッチ データセットを生成します。

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

S3 バケットにデータをアップロードする

データセットを Amazon S3 にアップロードします。

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

処理スクリプトと処理ステップを定義する

このステップでは、Python スクリプトを準備して、特徴量エンジニアリング、ワン ホット エンコーディングを実行し、モデル構築に使用するトレーニング、検証、およびテスト分割をキュレートします。 次のコードを実行して、処理スクリプトをビルドします。

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

次に、次のコード ブロックを実行してプロセッサをインスタンス化し、パイプライン ステップを実行して処理スクリプトを実行します。 処理スクリプトは Pandas で記述されているため、 SKLearnプロセッサ. パイプライン ProcessingStep 関数は次の引数を取ります: プロセッサ、生データセットの入力 S3 ロケーション、および処理されたデータセットを保存する出力 S3 ロケーション。

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

トレーニング ステップを定義する

SageMaker XGBoost estimator とパイプラインを使用してモデル トレーニングを設定する TrainingStep 関数:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

評価スクリプトとモデル評価ステップを定義する

次のコード ブロックを実行して、トレーニング後のモデルを評価します。 このスクリプトは、AUC スコアが指定されたしきい値を満たしているかどうかを確認するロジックをカプセル化します。

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

次に、次のコード ブロックを実行してプロセッサをインスタンス化し、パイプライン ステップを実行して評価スクリプトを実行します。 評価スクリプトは XGBoost パッケージを使用するため、 ScriptProcessor XGBoost イメージとともに。 パイプライン ProcessingStep 関数は次の引数を取ります: プロセッサ、生データセットの入力 S3 ロケーション、および処理されたデータセットを保存する出力 S3 ロケーション。

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

モデルの作成ステップを定義する

次のコードブロックを実行して、Pipelines モデルステップを使用して SageMaker モデルを作成します。 このステップでは、トレーニング ステップの出力を利用して、デプロイ用のモデルをパッケージ化します。 インスタンス タイプの引数の値は、この記事の前半で定義した Pipelines パラメータを使用して渡されることに注意してください。

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

バッチ変換ステップを定義する

次のコード ブロックを実行して、最初のステップで作成されたバッチ入力でトレーニング済みモデルを使用してバッチ変換を実行します。

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

モデルの登録ステップを定義する

次のコードは、パイプライン モデル ステップを使用して、SageMaker モデル レジストリ内にモデルを登録します。

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

パイプラインを停止する失敗ステップを定義する

次のコードは、パイプラインの失敗ステップを定義して、AUC スコアが定義されたしきい値を満たさない場合にパイプラインの実行を停止し、エラー メッセージを表示します。

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

AUC スコアを確認する条件ステップを定義する

次のコードは、AUC スコアをチェックし、条件付きでモデルを作成してバッチ変換を実行し、モデルをモデル レジストリに登録するか、失敗した状態でパイプラインの実行を停止する条件ステップを定義します。

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

パイプラインをビルドして実行する

すべてのコンポーネント ステップを定義したら、それらを Pipelines オブジェクトにまとめることができます。 Pipelines はステップ間の依存関係に基づいて順序シーケンスを自動的に推測するため、パイプラインの順序を指定する必要はありません。

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

ノートブックのセルで次のコードを実行します。 パイプラインが既に存在する場合、コードはパイプラインを更新します。 パイプラインが存在しない場合は、新しいパイプラインが作成されます。

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

まとめ

この投稿では、Pipelines で利用できるようになった新機能のいくつかを、他の組み込みの SageMaker 機能と XGBoost アルゴリズムとともに紹介して、チャーン予測用のモデルを開発、反復、デプロイしました。 ソリューションは、追加のデータ ソースで拡張できます

独自の ML ワークフローを実装します。 パイプライン ワークフローで利用可能な手順の詳細については、次を参照してください。 Amazon SageMaker モデル構築パイプライン & SageMaker ワークフローを選択します。 AWS SageMaker の例 GitHub リポジトリには、パイプラインを使用したさまざまなユース ケースに関するその他の例があります。


著者について

Amazon SageMaker Pipelines と Amazon SageMaker SDK PlatoBlockchain Data Intelligence の新機能。 垂直検索。 あい。ジェリー・ペン AWS SageMaker のソフトウェア開発エンジニアです。 彼は、トレーニングから本番環境でのモデル監視まで、エンドツーエンドの大規模 MLOps システムの構築に焦点を当てています。 また、MLOps の概念をより多くの聴衆に広めることにも情熱を注いでいます。

Amazon SageMaker Pipelines と Amazon SageMaker SDK PlatoBlockchain Data Intelligence の新機能。 垂直検索。 あい。ディーウェン・チー AWS のソフトウェア開発エンジニアです。 彼女は現在、SageMaker Pipelines の開発と改善に注力しています。 仕事以外では、彼女はチェロの練習を楽しんでいます。

Amazon SageMaker Pipelines と Amazon SageMaker SDK PlatoBlockchain Data Intelligence の新機能。 垂直検索。 あい。ガヤトリガナコタ は、AW​​S プロフェッショナル サービスのシニア機械学習エンジニアです。 彼女は、さまざまなドメインにわたる AI/ML ソリューションの開発、展開、および説明に情熱を注いでいます。 この役職に就く前は、データ サイエンティストおよび ML エンジニアとして、金融および小売業界のトップ グローバル企業で複数のイニシアチブを率いていました。 彼女はコロラド大学ボールダー校でデータ サイエンスを専門とするコンピュータ サイエンスの修士号を取得しています。

Amazon SageMaker Pipelines と Amazon SageMaker SDK PlatoBlockchain Data Intelligence の新機能。 垂直検索。 あい。ルピンダー・グレワル AWS のシニア Ai/ML スペシャリスト ソリューション アーキテクトです。 彼は現在、SageMaker でのモデルと MLOps の提供に注力しています。 この役職に就く前は、モデルの構築とホスティングを行う機械学習エンジニアとして働いていました。 仕事以外では、テニスや山道でのサイクリングを楽​​しんでいます。

Amazon SageMaker Pipelines と Amazon SageMaker SDK PlatoBlockchain Data Intelligence の新機能。 垂直検索。 あい。レイ・リー AWS プロフェッショナル サービスのシニア データ サイエンティストです。 彼の専門分野は、スタートアップからエンタープライズ組織まで、さまざまな規模の顧客向けの AI/ML ソリューションの構築と運用化に焦点を当てています。 仕事以外では、レイはフィットネスと旅行を楽しんでいます。

タイムスタンプ:

より多くの AWS機械学習