Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.

Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK

Pipa Amazon SageMaker memungkinkan ilmuwan data dan insinyur pembelajaran mesin (ML) untuk mengotomatiskan alur kerja pelatihan, yang membantu Anda membuat proses berulang untuk mengatur langkah-langkah pengembangan model untuk eksperimen cepat dan pelatihan ulang model. Anda dapat mengotomatiskan seluruh alur kerja pembuatan model, termasuk persiapan data, rekayasa fitur, pelatihan model, penyetelan model, dan validasi model, serta membuat katalognya di registri model. Anda dapat mengonfigurasi pipeline agar berjalan secara otomatis pada interval reguler atau saat peristiwa tertentu dipicu, atau Anda dapat menjalankannya secara manual sesuai kebutuhan.

Dalam posting ini, kami menyoroti beberapa peningkatan pada Amazon SageMaker SDK dan perkenalkan fitur baru Amazon SageMaker Pipelines yang memudahkan praktisi ML untuk membangun dan melatih model ML.

Pipelines terus berinovasi dalam pengalaman pengembangnya, dan dengan rilis terbaru ini, Anda kini dapat menggunakan layanan dengan cara yang lebih disesuaikan:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Dokumentasi yang diperbarui pada PipelineVariable penggunaan untuk estimator, prosesor, tuner, transformator, dan kelas dasar model, model Amazon, dan model kerangka kerja. Akan ada perubahan tambahan yang datang dengan versi SDK yang lebih baru untuk mendukung semua subkelas penaksir dan pemroses.
  • 2.90.0 - Ketersediaan ModelLangkah untuk tugas-tugas pembuatan dan pendaftaran sumber daya model terintegrasi.
  • 2.88.2 - Ketersediaan Sesi Pipa untuk interaksi terkelola dengan entitas dan sumber daya SageMaker.
  • 2.88.2 – Kompatibilitas subkelas untuk alur kerja alur kerja alur kerja sehingga Anda dapat membangun abstraksi pekerjaan dan mengonfigurasi serta menjalankan pemrosesan, pelatihan, transformasi, dan penyetelan pekerjaan seperti yang Anda lakukan tanpa pipeline.
  • 2.76.0 - Ketersediaan GagalLangkah untuk menghentikan pipa dengan status kegagalan secara kondisional.

Dalam postingan ini, kami memandu Anda melalui alur kerja menggunakan kumpulan data sampel dengan fokus pada pembuatan dan penerapan model untuk mendemonstrasikan cara mengimplementasikan fitur baru Pipelines. Pada akhirnya, Anda harus memiliki informasi yang cukup untuk berhasil menggunakan fitur-fitur baru ini dan menyederhanakan beban kerja ML Anda.

Ikhtisar fitur

Pipelines menawarkan fitur baru berikut:

  • Anotasi variabel saluran pipa – Parameter metode tertentu menerima beberapa jenis input, termasuk PipelineVariables, dan dokumentasi tambahan telah ditambahkan untuk memperjelas di mana PipelineVariables didukung dalam versi stabil terbaru dari dokumentasi SageMaker SDK dan tanda tangan init dari fungsi. Misalnya, dalam penaksir TensorFlow berikut, tanda tangan init sekarang menunjukkan bahwa model_dir dan image_uri mendukung PipelineVariables, sedangkan parameter lainnya tidak. Untuk informasi lebih lanjut, lihat Penaksir TensorFlow.
    • Sebelum:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Setelah:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Sesi pipa - Sesi Pipa adalah konsep baru yang diperkenalkan untuk menghadirkan kesatuan di SageMaker SDK dan memperkenalkan inisialisasi sumber daya pipeline (panggilan run ditangkap tetapi tidak dijalankan hingga pipeline dibuat dan dijalankan). Itu PipelineSession konteks mewarisi SageMakerSession dan mengimplementasikan metode yang mudah bagi Anda untuk berinteraksi dengan entitas dan sumber daya SageMaker lainnya, seperti tugas pelatihan, titik akhir, dan set data masukan yang disimpan di Layanan Penyimpanan Sederhana Amazon (Amazon S3).
  • Kompatibilitas subkelas dengan langkah-langkah pekerjaan pipa alur kerja – Anda sekarang dapat membangun abstraksi pekerjaan dan mengonfigurasi serta menjalankan pemrosesan, pelatihan, transformasi, dan penyetelan pekerjaan seperti yang Anda lakukan tanpa pipeline.
    • Misalnya, membuat langkah pemrosesan dengan SKLearnProcessor sebelumnya diperlukan sebagai berikut:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Seperti yang kita lihat pada kode sebelumnya, ProcessingStep perlu melakukan logika preprocessing yang pada dasarnya sama dengan .run, hanya tanpa memulai panggilan API untuk memulai pekerjaan. Tetapi dengan kompatibilitas subkelas yang sekarang diaktifkan dengan langkah-langkah pekerjaan pipa alur kerja, kami mendeklarasikan step_args argumen yang menggunakan logika prapemrosesan dengan .run sehingga Anda dapat membuat abstraksi pekerjaan dan mengonfigurasinya seperti yang akan Anda gunakan tanpa Pipeline. Kami juga lulus di pipeline_session, Yang merupakan PipelineSession objek, bukannya sagemaker_session untuk memastikan panggilan yang dijalankan ditangkap tetapi tidak dipanggil sampai pipa dibuat dan dijalankan. Lihat kode berikut:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Langkah model (pendekatan yang disederhanakan dengan langkah pembuatan dan pendaftaran model) –Pipelines menawarkan dua jenis langkah untuk diintegrasikan dengan model SageMaker: CreateModelStep dan RegisterModel. Anda sekarang dapat mencapai keduanya hanya dengan menggunakan ModelStep Tipe. Perhatikan bahwa PipelineSession diperlukan untuk mencapai hal ini. Ini membawa kesamaan antara langkah pipa dan SDK.
    • Sebelum:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Setelah:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Langkah gagal (penghentian bersyarat dari jalur pipa) - FailStep memungkinkan pipa dihentikan dengan status kegagalan jika suatu kondisi terpenuhi, seperti jika skor model di bawah ambang batas tertentu.

Ikhtisar solusi

Dalam solusi ini, titik masuk Anda adalah Studio Amazon SageMaker lingkungan pengembangan terintegrasi (IDE) untuk eksperimen cepat. Studio menawarkan lingkungan untuk mengelola pengalaman Pipeline menyeluruh. Dengan Studio, Anda dapat melewati Konsol Manajemen AWS untuk seluruh manajemen alur kerja Anda. Untuk informasi lebih lanjut tentang mengelola Pipeline dari dalam Studio, lihat Lihat, Lacak, dan Jalankan Pipeline SageMaker di SageMaker Studio.

Diagram berikut mengilustrasikan arsitektur tingkat tinggi dari alur kerja ML dengan langkah-langkah berbeda untuk melatih dan menghasilkan inferensi menggunakan fitur baru.

Pipa mencakup langkah-langkah berikut:

  1. Praproses data untuk membangun fitur yang diperlukan dan membagi data menjadi set data kereta, validasi, dan pengujian.
  2. Buat tugas pelatihan dengan kerangka kerja SageMaker XGBoost.
  3. Evaluasi model yang dilatih menggunakan dataset uji.
  4. Periksa apakah skor AUC di atas ambang batas yang telah ditentukan.
    • Jika skor AUC kurang dari ambang batas, hentikan jalur pipa dan tandai sebagai gagal.
    • Jika skor AUC lebih besar dari ambang batas, buat model SageMaker dan daftarkan di registri model SageMaker.
  5. Terapkan transformasi batch pada kumpulan data yang diberikan menggunakan model yang dibuat pada langkah sebelumnya.

Prasyarat

Untuk mengikuti postingan ini, Anda memerlukan akun AWS dengan a Domain studio.

Pipeline terintegrasi langsung dengan entitas dan sumber daya SageMaker, sehingga Anda tidak perlu berinteraksi dengan layanan AWS lainnya. Anda juga tidak perlu mengelola sumber daya apa pun karena ini adalah layanan yang terkelola sepenuhnya, yang berarti ia membuat dan mengelola sumber daya untuk Anda. Untuk informasi selengkapnya tentang berbagai komponen SageMaker yang merupakan API Python mandiri bersama dengan komponen Studio yang terintegrasi, lihat Halaman produk SageMaker.

Sebelum memulai, instal SageMaker SDK versi >= 2.104.0 dan xlrd >=1.0.0 dalam notebook Studio menggunakan cuplikan kode berikut:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

alur kerja ML

Untuk posting ini, Anda menggunakan komponen berikut:

  • Persiapan data
    • Pengolahan SageMaker – SageMaker Processing adalah layanan terkelola sepenuhnya yang memungkinkan Anda menjalankan transformasi data kustom dan rekayasa fitur untuk beban kerja ML.
  • Bangunan model
  • Pelatihan dan evaluasi model
    • Pelatihan sekali klik – Fitur pelatihan yang didistribusikan SageMaker. SageMaker menyediakan perpustakaan pelatihan terdistribusi untuk paralelisme data dan paralelisme model. Pustaka dioptimalkan untuk lingkungan pelatihan SageMaker, membantu menyesuaikan tugas pelatihan terdistribusi Anda ke SageMaker, dan meningkatkan kecepatan dan hasil pelatihan.
    • Eksperimen SageMaker – Eksperimen adalah kemampuan SageMaker yang memungkinkan Anda mengatur, melacak, membandingkan, dan mengevaluasi iterasi ML Anda.
    • Transformasi batch SageMaker – Transformasi batch atau penilaian offline adalah layanan terkelola di SageMaker yang memungkinkan Anda memprediksi pada kumpulan data yang lebih besar menggunakan model ML Anda.
  • Orkestrasi alur kerja

Pipeline SageMaker adalah serangkaian langkah yang saling berhubungan yang ditentukan oleh definisi pipeline JSON. Ini mengkodekan pipa menggunakan grafik asiklik terarah (DAG). DAG memberikan informasi tentang persyaratan dan hubungan antara setiap langkah pipa, dan strukturnya ditentukan oleh dependensi data antar langkah. Dependensi ini dibuat ketika properti dari output langkah dilewatkan sebagai input ke langkah lain.

Diagram berikut mengilustrasikan langkah-langkah berbeda dalam pipeline SageMaker (untuk kasus penggunaan prediksi churn) di mana koneksi antara langkah-langkah tersebut disimpulkan oleh SageMaker berdasarkan input dan output yang ditentukan oleh definisi langkah.

Bagian selanjutnya berjalan melalui pembuatan setiap langkah pipa dan menjalankan seluruh pipa setelah dibuat.

Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.

Struktur proyek

Mari kita mulai dengan struktur proyek:

  • /contoh-sm-pipelines-end-to-end – Nama proyek
    • /data – Dataset
    • /pipa – File kode untuk komponen pipa
      • / pelangganchurn
        • praproses.py
        • evaluasi.py
    • sagemaker-pipelines-project.ipynb – Buku catatan yang menelusuri alur kerja pemodelan menggunakan fitur baru Pipelines

Unduh kumpulan datanya

Untuk mengikuti posting ini, Anda perlu mengunduh dan menyimpan contoh kumpulan data di bawah folder data dalam direktori home proyek, yang menyimpan file di Sistem File Amazon Elastis (Amazon EFS) dalam lingkungan Studio.

Bangun komponen pipa

Sekarang Anda siap untuk membangun komponen pipa.

Impor pernyataan dan deklarasikan parameter dan konstanta

Buat buku catatan Studio bernama sagemaker-pipelines-project.ipynb dalam direktori home proyek. Masukkan blok kode berikut dalam sel, dan jalankan sel untuk mengatur objek klien SageMaker dan S3, buat PipelineSession, dan atur lokasi bucket S3 menggunakan bucket default yang disertakan dengan sesi SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines mendukung parameterisasi, yang memungkinkan Anda menentukan parameter input saat runtime tanpa mengubah kode pipeline Anda. Anda dapat menggunakan modul yang tersedia di bawah sagemaker.workflow.parameters modul, seperti ParameterInteger, ParameterFloat, dan ParameterString, untuk menentukan parameter pipeline dari berbagai tipe data. Jalankan kode berikut untuk menyiapkan beberapa parameter input:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Hasilkan kumpulan data batch

Hasilkan kumpulan data batch, yang Anda gunakan nanti dalam langkah transformasi batch:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Unggah data ke ember S3

Unggah kumpulan data ke Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Tentukan skrip pemrosesan dan langkah pemrosesan

Pada langkah ini, Anda menyiapkan skrip Python untuk melakukan rekayasa fitur, satu pengkodean panas, dan menyusun pelatihan, validasi, dan pemisahan pengujian yang akan digunakan untuk pembuatan model. Jalankan kode berikut untuk membuat skrip pemrosesan Anda:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Selanjutnya, jalankan blok kode berikut untuk membuat instance prosesor dan langkah Pipelines untuk menjalankan skrip pemrosesan. Karena skrip pemrosesan ditulis dalam Pandas, Anda menggunakan a Prosesor SKLearn. Jalur Pipa ProcessingStep fungsi mengambil argumen berikut: prosesor, lokasi input S3 untuk dataset mentah, dan lokasi output S3 untuk menyimpan dataset yang diproses.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Tentukan langkah pelatihan

Siapkan pelatihan model menggunakan estimator SageMaker XGBoost dan Pipelines TrainingStep fungsi:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Tentukan skrip evaluasi dan langkah evaluasi model

Jalankan blok kode berikut untuk mengevaluasi model setelah dilatih. Skrip ini merangkum logika untuk memeriksa apakah skor AUC memenuhi ambang batas yang ditentukan.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Selanjutnya, jalankan blok kode berikut untuk membuat instance prosesor dan langkah Pipelines untuk menjalankan skrip evaluasi. Karena skrip evaluasi menggunakan paket XGBoost, Anda menggunakan a ScriptProcessor bersama dengan gambar XGBoost. Jalur Pipa ProcessingStep fungsi mengambil argumen berikut: prosesor, lokasi input S3 untuk dataset mentah, dan lokasi output S3 untuk menyimpan dataset yang diproses.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Tentukan langkah membuat model

Jalankan blok kode berikut untuk membuat model SageMaker menggunakan langkah model Pipelines. Langkah ini memanfaatkan output dari langkah pelatihan untuk mengemas model untuk penerapan. Perhatikan bahwa nilai untuk argumen tipe instans diteruskan menggunakan parameter Pipelines yang Anda tetapkan sebelumnya di postingan.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Tentukan langkah transformasi batch

Jalankan blok kode berikut untuk menjalankan transformasi batch menggunakan model terlatih dengan input batch yang dibuat pada langkah pertama:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Tentukan langkah model register

Kode berikut mendaftarkan model dalam registri model SageMaker menggunakan langkah model Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Tentukan langkah gagal untuk menghentikan pipa

Kode berikut mendefinisikan langkah Pipelines fail untuk menghentikan pipeline yang berjalan dengan pesan kesalahan jika skor AUC tidak memenuhi ambang batas yang ditentukan:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Tentukan langkah kondisi untuk memeriksa skor AUC

Kode berikut menentukan langkah kondisi untuk memeriksa skor AUC dan membuat model secara kondisional dan menjalankan transformasi batch dan mendaftarkan model di registri model, atau menghentikan jalur pipa yang dijalankan dalam keadaan gagal:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Bangun dan jalankan pipa

Setelah mendefinisikan semua langkah komponen, Anda dapat merakitnya menjadi objek Pipelines. Anda tidak perlu menentukan urutan pipeline karena Pipelines secara otomatis menyimpulkan urutan urutan berdasarkan dependensi di antara langkah-langkahnya.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Jalankan kode berikut dalam sel di buku catatan Anda. Jika pipa sudah ada, kode memperbarui pipa. Jika pipa tidak ada, itu membuat yang baru.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Kesimpulan

Dalam posting ini, kami memperkenalkan beberapa fitur baru yang sekarang tersedia dengan Pipelines bersama dengan fitur SageMaker bawaan lainnya dan algoritme XGBoost untuk mengembangkan, mengulangi, dan menerapkan model untuk prediksi churn. Solusinya dapat diperluas dengan sumber data tambahan

untuk mengimplementasikan alur kerja ML Anda sendiri. Untuk detail selengkapnya tentang langkah-langkah yang tersedia di alur kerja Pipeline, lihat Pipa Pembuatan Model Amazon SageMaker dan Alur Kerja SageMaker. itu Contoh AWS SageMaker Repo GitHub memiliki lebih banyak contoh seputar berbagai kasus penggunaan menggunakan Pipelines.


Tentang Penulis

Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Jerry Peng adalah insinyur pengembangan perangkat lunak dengan AWS SageMaker. Dia berfokus untuk membangun sistem MLOps skala besar yang menyeluruh mulai dari pelatihan hingga pemantauan model dalam produksi. Dia juga bersemangat membawa konsep MLOps ke khalayak yang lebih luas.

Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Dewi Qi adalah Insinyur Pengembangan Perangkat Lunak di AWS. Dia saat ini berfokus pada pengembangan dan peningkatan SageMaker Pipelines. Di luar pekerjaan, dia senang berlatih Cello.

Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Gayatri Ghanakota adalah Insinyur Pembelajaran Mesin Senior dengan Layanan Profesional AWS. Dia bersemangat mengembangkan, menerapkan, dan menjelaskan solusi AI/ML di berbagai domain. Sebelum peran ini, ia memimpin berbagai inisiatif sebagai ilmuwan data dan insinyur ML dengan perusahaan global terkemuka di bidang keuangan dan ritel. Dia meraih gelar master di bidang Ilmu Komputer dengan spesialisasi Ilmu Data dari University of Colorado, Boulder.

Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Rupinder Grewal adalah Arsitek Solusi Spesialis Sr Ai/ML dengan AWS. Dia saat ini berfokus pada penyajian model dan MLOps di SageMaker. Sebelum peran ini, ia telah bekerja sebagai Insinyur Pembelajaran Mesin yang membangun dan menjadi model hosting. Di luar pekerjaan ia menikmati bermain tenis dan bersepeda di jalur gunung.

Fitur baru untuk Amazon SageMaker Pipelines dan Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Ray Li adalah Sr. Data Scientist dengan AWS Professional Services. Spesialisasinya berfokus pada membangun dan mengoperasionalkan solusi AI/ML untuk pelanggan dengan berbagai ukuran, mulai dari perusahaan rintisan hingga organisasi perusahaan. Di luar pekerjaan, Ray menikmati kebugaran dan bepergian.

Stempel Waktu:

Lebih dari Pembelajaran Mesin AWS