Neue Funktionen für Amazon SageMaker Pipelines und die Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.

Neue Funktionen für Amazon SageMaker-Pipelines und das Amazon SageMaker SDK

Amazon SageMaker-Pipelines ermöglicht Datenwissenschaftlern und Ingenieuren für maschinelles Lernen (ML) die Automatisierung von Trainingsworkflows, was Ihnen hilft, einen wiederholbaren Prozess zu erstellen, um Modellentwicklungsschritte für schnelles Experimentieren und Modellneutraining zu orchestrieren. Sie können den gesamten Modellerstellungs-Workflow automatisieren, einschließlich Datenvorbereitung, Feature-Engineering, Modelltraining, Modelloptimierung und Modellvalidierung, und ihn in der Modellregistrierung katalogisieren. Sie können Pipelines so konfigurieren, dass sie automatisch in regelmäßigen Abständen oder beim Auslösen bestimmter Ereignisse ausgeführt werden, oder Sie können sie nach Bedarf manuell ausführen.

In diesem Beitrag heben wir einige der Verbesserungen an der hervor Amazon Sage Maker SDK und führen neue Funktionen von Amazon SageMaker Pipelines ein, die es ML-Praktikern erleichtern, ML-Modelle zu erstellen und zu trainieren.

Pipelines verbessert seine Entwicklererfahrung weiterhin, und mit diesen neuesten Versionen können Sie den Dienst jetzt auf individuellere Weise nutzen:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Aktualisierte Dokumentation auf PipelineVariable Verwendung für Estimator-, Prozessor-, Tuner-, Transformer- und Modellbasisklassen, Amazon-Modelle und Framework-Modelle. Mit neueren Versionen des SDK werden zusätzliche Änderungen vorgenommen, um alle Unterklassen von Schätzern und Prozessoren zu unterstützen.
  • 2.90.0 - Verfügbarkeit von Modellschritt für integrierte Aufgaben zur Erstellung und Registrierung von Modellressourcen.
  • 2.88.2 - Verfügbarkeit von Pipeline-Sitzung für die verwaltete Interaktion mit SageMaker-Einheiten und -Ressourcen.
  • 2.88.2 – Unterklassenkompatibilität für Workflow-Pipeline-Auftragsschritte So können Sie Jobabstraktionen erstellen und Verarbeitungs-, Trainings-, Transformations- und Optimierungsjobs konfigurieren und ausführen, wie Sie es ohne eine Pipeline tun würden.
  • 2.76.0 - Verfügbarkeit von Fehlerschritt um eine Pipeline mit einem Fehlerstatus bedingt zu stoppen.

In diesem Beitrag führen wir Sie anhand eines Beispieldatensatzes durch einen Workflow mit Schwerpunkt auf Modellerstellung und -bereitstellung, um zu demonstrieren, wie die neuen Funktionen von Pipelines implementiert werden. Am Ende sollten Sie über genügend Informationen verfügen, um diese neueren Funktionen erfolgreich zu verwenden und Ihre ML-Workloads zu vereinfachen.

Funktionsübersicht

Pipelines bietet die folgenden neuen Funktionen:

  • Anmerkung zu Pipeline-Variablen – Bestimmte Methodenparameter akzeptieren mehrere Eingabetypen, einschließlich PipelineVariables, und zusätzliche Dokumentation wurde hinzugefügt, um zu verdeutlichen, wo PipelineVariables werden sowohl in der neuesten stabilen Version der SageMaker SDK-Dokumentation als auch in der Init-Signatur der Funktionen unterstützt. Im folgenden TensorFlow-Schätzer zeigt die Init-Signatur dies beispielsweise jetzt an model_dir und image_uri Support PipelineVariables, während die anderen Parameter dies nicht tun. Weitere Informationen finden Sie unter TensorFlow-Schätzer.
    • Vor:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Nach:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Pipeline-Sitzung - Pipeline-Sitzung ist ein neues Konzept, das eingeführt wurde, um Einheit über das SageMaker SDK zu bringen, und führt eine verzögerte Initialisierung der Pipeline-Ressourcen ein (die Ausführungsaufrufe werden erfasst, aber nicht ausgeführt, bis die Pipeline erstellt und ausgeführt wird). Das PipelineSession Kontext erbt die SageMakerSession und implementiert praktische Methoden, mit denen Sie mit anderen SageMaker-Entitäten und -Ressourcen interagieren können, z. B. Trainingsjobs, Endpunkte und darin gespeicherte Eingabedatensätze Amazon Simple Storage-Service (Amazon S3).
  • Unterklassenkompatibilität mit Jobschritten der Workflow-Pipeline – Sie können jetzt Jobabstraktionen erstellen und Verarbeitungs-, Trainings-, Transformations- und Optimierungsjobs konfigurieren und ausführen, wie Sie es ohne eine Pipeline tun würden.
    • Erstellen Sie beispielsweise einen Verarbeitungsschritt mit SKLearnProcessor zuvor war Folgendes erforderlich:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Wie wir im vorherigen Code sehen, ProcessingStep muss grundsätzlich die gleiche Vorverarbeitungslogik wie ausführen .run, nur ohne den API-Aufruf zum Starten des Jobs zu initiieren. Aber da die Unterklassenkompatibilität jetzt mit Workflow-Pipeline-Auftragsschritten aktiviert ist, deklarieren wir die step_args -Argument, das die Vorverarbeitungslogik mit .run verwendet, sodass Sie eine Auftragsabstraktion erstellen und so konfigurieren können, wie Sie sie ohne Pipelines verwenden würden. Wir passieren auch in der pipeline_session, die ein PipelineSession Objekt, statt sagemaker_session um sicherzustellen, dass die Ausführungsaufrufe erfasst, aber nicht aufgerufen werden, bis die Pipeline erstellt und ausgeführt wird. Siehe folgenden Code:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Modellschritt (ein optimierter Ansatz mit Modellerstellungs- und Registrierungsschritten) –Pipelines bietet zwei Schritttypen zur Integration mit SageMaker-Modellen: CreateModelStep und RegisterModel. Sie können jetzt beides erreichen, indem Sie nur die verwenden ModelStep Typ. Beachten Sie, dass a PipelineSession ist erforderlich, um dies zu erreichen. Dies bringt Ähnlichkeit zwischen den Pipelineschritten und dem SDK.
    • Vor:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Nach:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fehlerschritt (bedingter Stopp des Pipeline-Laufs) - FailStep ermöglicht das Stoppen einer Pipeline mit einem Fehlerstatus, wenn eine Bedingung erfüllt ist, z. B. wenn der Modellwert unter einem bestimmten Schwellenwert liegt.

Lösungsüberblick

In dieser Lösung ist Ihr Einstiegspunkt die Amazon SageMaker-Studio integrierte Entwicklungsumgebung (IDE) für schnelles Experimentieren. Studio bietet eine Umgebung zum Verwalten der End-to-End-Pipelines-Erfahrung. Mit Studio können Sie das umgehen AWS-Managementkonsole für Ihr gesamtes Workflow-Management. Weitere Informationen zum Verwalten von Pipelines in Studio finden Sie unter Anzeigen, Verfolgen und Ausführen von SageMaker-Pipelines in SageMaker Studio.

Das folgende Diagramm veranschaulicht die High-Level-Architektur des ML-Workflows mit den verschiedenen Schritten zum Trainieren und Generieren von Inferenzen mithilfe der neuen Features.

Die Pipeline umfasst die folgenden Schritte:

  1. Verarbeiten Sie Daten vor, um die erforderlichen Funktionen zu erstellen, und teilen Sie Daten in Trainings-, Validierungs- und Testdatensätze auf.
  2. Erstellen Sie einen Trainingsjob mit dem SageMaker XGBoost-Framework.
  3. Bewerten Sie das trainierte Modell mithilfe des Testdatasets.
  4. Überprüfen Sie, ob der AUC-Score über einem vordefinierten Schwellenwert liegt.
    • Wenn der AUC-Wert unter dem Schwellenwert liegt, stoppen Sie die Pipelineausführung und markieren Sie sie als fehlgeschlagen.
    • Wenn der AUC-Score größer als der Schwellenwert ist, erstellen Sie ein SageMaker-Modell und registrieren Sie es in der SageMaker-Modellregistrierung.
  5. Wenden Sie die Stapeltransformation auf das angegebene Dataset an, indem Sie das im vorherigen Schritt erstellte Modell verwenden.

Voraussetzungen:

Um diesem Beitrag folgen zu können, benötigen Sie ein AWS-Konto mit a Studio-Domain.

Pipelines ist direkt in SageMaker-Entitäten und -Ressourcen integriert, sodass Sie nicht mit anderen AWS-Services interagieren müssen. Sie müssen auch keine Ressourcen verwalten, da es sich um einen vollständig verwalteten Dienst handelt, was bedeutet, dass Ressourcen für Sie erstellt und verwaltet werden. Weitere Informationen zu den verschiedenen SageMaker-Komponenten, die sowohl eigenständige Python-APIs als auch integrierte Komponenten von Studio sind, finden Sie unter SageMaker-Produktseite.

Bevor Sie beginnen, installieren Sie SageMaker SDK Version >= 2.104.0 und xlrd >=1.0.0 im Studio-Notebook mit dem folgenden Code-Snippet:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML-Workflow

Für diesen Beitrag verwenden Sie die folgenden Komponenten:

  • Datenaufbereitung
    • SageMaker-Verarbeitung – SageMaker Processing ist ein vollständig verwalteter Service, mit dem Sie benutzerdefinierte Datentransformationen und Feature-Engineering für ML-Workloads ausführen können.
  • Modellbau
  • Modelltraining und Evaluation
    • Ein-Klick-Training – Die verteilte Trainingsfunktion von SageMaker. SageMaker bietet verteilte Trainingsbibliotheken für Datenparallelität und Modellparallelität. Die Bibliotheken sind für die SageMaker-Trainingsumgebung optimiert, helfen bei der Anpassung Ihrer verteilten Trainingsjobs an SageMaker und verbessern die Trainingsgeschwindigkeit und den Durchsatz.
    • SageMaker-Experimente – Experimente ist eine Funktion von SageMaker, mit der Sie Ihre ML-Iterationen organisieren, verfolgen, vergleichen und bewerten können.
    • SageMaker-Batch-Transformation – Batch-Transformation oder Offline-Scoring ist ein verwalteter Dienst in SageMaker, mit dem Sie mithilfe Ihrer ML-Modelle Vorhersagen für einen größeren Datensatz treffen können.
  • Workflow-Orchestrierung

Eine SageMaker-Pipeline ist eine Reihe miteinander verbundener Schritte, die durch eine JSON-Pipelinedefinition definiert werden. Es codiert eine Pipeline mithilfe eines gerichteten azyklischen Graphen (DAG). Die DAG liefert Informationen zu den Anforderungen und Beziehungen zwischen den einzelnen Schritten der Pipeline, und ihre Struktur wird durch die Datenabhängigkeiten zwischen den Schritten bestimmt. Diese Abhängigkeiten werden erstellt, wenn die Eigenschaften der Ausgabe eines Schritts als Eingabe an einen anderen Schritt übergeben werden.

Das folgende Diagramm veranschaulicht die verschiedenen Schritte in der SageMaker-Pipeline (für einen Anwendungsfall der Abwanderungsvorhersage), wobei die Verbindungen zwischen den Schritten von SageMaker basierend auf den durch die Schrittdefinitionen definierten Eingaben und Ausgaben abgeleitet werden.

In den nächsten Abschnitten werden die einzelnen Schritte der Pipeline erstellt und die gesamte Pipeline ausgeführt, nachdem sie erstellt wurde.

Neue Funktionen für Amazon SageMaker Pipelines und die Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.

Projektstruktur

Beginnen wir mit der Projektstruktur:

  • /sm-pipelines-End-to-End-Beispiel – Der Projektname
    • /Daten – Die Datensätze
    • /Pipelines – Die Codedateien für Pipelinekomponenten
      • /Kundenabwanderung
        • preprocess.py
        • auswerten.py
    • sagemaker-pipelines-project.ipynb – Ein Notebook, das durch den Modellierungsworkflow mit den neuen Funktionen von Pipelines führt

Laden Sie den Datensatz herunter

Um diesem Beitrag zu folgen, müssen Sie die herunterladen und speichern Beispieldatensatz unter dem Datenordner im Home-Verzeichnis des Projekts, in dem die Datei gespeichert wird Amazon Elastic File System (Amazon EFS) innerhalb der Studio-Umgebung.

Erstellen Sie die Pipelinekomponenten

Jetzt können Sie die Pipelinekomponenten erstellen.

Importieren Sie Anweisungen und deklarieren Sie Parameter und Konstanten

Erstellen Sie ein Studio-Notizbuch mit dem Namen sagemaker-pipelines-project.ipynb innerhalb des Home-Verzeichnisses des Projekts. Geben Sie den folgenden Codeblock in eine Zelle ein und führen Sie die Zelle aus, um SageMaker- und S3-Client-Objekte einzurichten, create PipelineSession, und richten Sie den S3-Bucket-Speicherort mit dem Standard-Bucket ein, der mit einer SageMaker-Sitzung geliefert wird:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines unterstützt die Parametrisierung, wodurch Sie Eingabeparameter zur Laufzeit angeben können, ohne Ihren Pipeline-Code zu ändern. Sie können die unter der verfügbaren Module verwenden sagemaker.workflow.parameters Modul, wie z ParameterInteger, ParameterFloat und ParameterString, um Pipelineparameter verschiedener Datentypen anzugeben. Führen Sie den folgenden Code aus, um mehrere Eingabeparameter einzurichten:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generieren Sie ein Batch-Dataset

Generieren Sie das Batch-Dataset, das Sie später im Batch-Transformationsschritt verwenden:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Hochladen von Daten in einen S3-Bucket

Laden Sie die Datensätze in Amazon S3 hoch:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definieren Sie ein Verarbeitungsskript und einen Verarbeitungsschritt

In diesem Schritt bereiten Sie ein Python-Skript vor, um Feature-Engineering, eine Hot-Codierung durchzuführen und die Trainings-, Validierungs- und Testaufteilungen zu kuratieren, die für die Modellerstellung verwendet werden sollen. Führen Sie den folgenden Code aus, um Ihr Verarbeitungsskript zu erstellen:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Führen Sie als Nächstes den folgenden Codeblock aus, um den Prozessor und den Schritt „Pipelines“ zu instanziieren, um das Verarbeitungsskript auszuführen. Da das Verarbeitungsskript in Pandas geschrieben ist, verwenden Sie a SKLearnProcessor. Die Pipelines ProcessingStep Die Funktion akzeptiert die folgenden Argumente: den Prozessor, die S3-Eingabespeicherorte für Rohdatensätze und die S3-Ausgabespeicherorte zum Speichern verarbeiteter Datensätze.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definieren Sie einen Trainingsschritt

Richten Sie das Modelltraining mit einem SageMaker XGBoost-Estimator und den Pipelines ein TrainingStep Funktion:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definieren Sie das Bewertungsskript und den Modellbewertungsschritt

Führen Sie den folgenden Codeblock aus, um das Modell nach dem Training auszuwerten. Dieses Skript kapselt die Logik zur Überprüfung, ob der AUC-Score den angegebenen Schwellenwert erreicht.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Führen Sie als Nächstes den folgenden Codeblock aus, um den Prozessor und den Schritt „Pipelines“ zu instanziieren, um das Evaluierungsskript auszuführen. Da das Evaluierungsskript das XGBoost-Paket verwendet, verwenden Sie a ScriptProcessor zusammen mit dem XGBoost-Image. Die Pipelines ProcessingStep Die Funktion akzeptiert die folgenden Argumente: den Prozessor, die S3-Eingabespeicherorte für Rohdatensätze und die S3-Ausgabespeicherorte zum Speichern verarbeiteter Datensätze.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definieren Sie einen Modellerstellungsschritt

Führen Sie den folgenden Codeblock aus, um ein SageMaker-Modell mithilfe des Pipelines-Modellschritts zu erstellen. Dieser Schritt verwendet die Ausgabe des Trainingsschritts, um das Modell für die Bereitstellung zu verpacken. Beachten Sie, dass der Wert für das Instance-Typ-Argument mithilfe des Pipelines-Parameters übergeben wird, den Sie zuvor in diesem Beitrag definiert haben.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definieren Sie einen Stapeltransformationsschritt

Führen Sie den folgenden Codeblock aus, um die Batch-Transformation mithilfe des trainierten Modells mit der im ersten Schritt erstellten Batch-Eingabe auszuführen:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definieren Sie einen Registrierungsmodellschritt

Der folgende Code registriert das Modell in der SageMaker-Modellregistrierung mithilfe des Pipelines-Modellschritts:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definieren Sie einen Fehlerschritt, um die Pipeline zu stoppen

Der folgende Code definiert den Pipelines-Fail-Schritt, um die Pipelineausführung mit einer Fehlermeldung zu stoppen, wenn der AUC-Score den definierten Schwellenwert nicht erreicht:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Definieren Sie einen Bedingungsschritt, um den AUC-Score zu überprüfen

Der folgende Code definiert einen Bedingungsschritt, um den AUC-Score zu prüfen und ein Modell bedingt zu erstellen und eine Stapeltransformation auszuführen und ein Modell in der Modellregistrierung zu registrieren oder die Pipelineausführung in einem fehlgeschlagenen Zustand zu stoppen:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Erstellen Sie die Pipeline und führen Sie sie aus

Nachdem Sie alle Komponentenschritte definiert haben, können Sie sie zu einem Pipelines-Objekt zusammenfügen. Sie müssen die Reihenfolge der Pipeline nicht angeben, da Pipelines die Reihenfolge automatisch auf der Grundlage der Abhängigkeiten zwischen den Schritten ableitet.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Führen Sie den folgenden Code in einer Zelle in Ihrem Notizbuch aus. Wenn die Pipeline bereits vorhanden ist, aktualisiert der Code die Pipeline. Wenn die Pipeline nicht vorhanden ist, wird eine neue erstellt.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Zusammenfassung

In diesem Beitrag haben wir einige der neuen Funktionen vorgestellt, die jetzt mit Pipelines verfügbar sind, zusammen mit anderen integrierten SageMaker-Funktionen und dem XGBoost-Algorithmus zum Entwickeln, Iterieren und Bereitstellen eines Modells für die Abwanderungsvorhersage. Die Lösung kann um zusätzliche Datenquellen erweitert werden

um Ihren eigenen ML-Workflow zu implementieren. Weitere Einzelheiten zu den im Pipelines-Workflow verfügbaren Schritten finden Sie unter Amazon SageMaker-Modellerstellungspipeline und SageMaker-Workflowsdem „Vermischten Geschmack“. Seine AWS SageMaker-Beispiele Das GitHub-Repository enthält weitere Beispiele für verschiedene Anwendungsfälle mit Pipelines.


Über die Autoren

Neue Funktionen für Amazon SageMaker Pipelines und die Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.Jerry Peng ist Softwareentwicklungsingenieur bei AWS SageMaker. Er konzentriert sich auf den Aufbau eines umfassenden MLOps-Systems im großen Maßstab von der Schulung bis zur Modellüberwachung in der Produktion. Er ist auch leidenschaftlich daran interessiert, das Konzept von MLOps einem breiteren Publikum zugänglich zu machen.

Neue Funktionen für Amazon SageMaker Pipelines und die Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.Dewen Qi ist Softwareentwicklungsingenieur bei AWS. Derzeit konzentriert sie sich auf die Entwicklung und Verbesserung von SageMaker-Pipelines. Außerhalb der Arbeit übt sie gerne Cello.

Neue Funktionen für Amazon SageMaker Pipelines und die Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.Gayatri Ghanakota ist Senior Machine Learning Engineer bei AWS Professional Services. Ihre Leidenschaft gilt der Entwicklung, Bereitstellung und Erläuterung von KI/ML-Lösungen in verschiedenen Bereichen. Vor dieser Funktion leitete sie mehrere Initiativen als Datenwissenschaftlerin und ML-Ingenieurin bei weltweit führenden Unternehmen im Finanz- und Einzelhandelsbereich. Sie hat einen Master-Abschluss in Informatik mit Spezialisierung auf Data Science von der University of Colorado, Boulder.

Neue Funktionen für Amazon SageMaker Pipelines und die Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.Rupinder Grewal ist ein Sr Ai/ML Specialist Solutions Architect bei AWS. Derzeit konzentriert er sich auf die Bereitstellung von Modellen und MLOps auf SageMaker. Vor dieser Funktion hat er als Machine Learning Engineer gearbeitet und Modelle erstellt und gehostet. Außerhalb der Arbeit spielt er gerne Tennis und radelt auf Bergpfaden.

Neue Funktionen für Amazon SageMaker Pipelines und die Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.Ray Li ist Senior Data Scientist bei AWS Professional Services. Seine Spezialität konzentriert sich auf den Aufbau und die Operationalisierung von KI/ML-Lösungen für Kunden unterschiedlicher Größe, von Start-ups bis hin zu Großunternehmen. Außerhalb der Arbeit genießt Ray Fitness und Reisen.

Zeitstempel:

Mehr von AWS Maschinelles Lernen