Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK

Amazon SageMaker-rörledningar låter datavetare och maskininlärningsingenjörer (ML) automatisera träningsarbetsflöden, vilket hjälper dig att skapa en repeterbar process för att organisera modellutvecklingssteg för snabba experiment och modellomskolning. Du kan automatisera hela arbetsflödet för modellbyggandet, inklusive dataförberedelse, funktionsutveckling, modellträning, modellinställning och modellvalidering, och katalogisera det i modellregistret. Du kan konfigurera pipelines att köras automatiskt med jämna mellanrum eller när vissa händelser utlöses, eller så kan du köra dem manuellt efter behov.

I det här inlägget lyfter vi fram några av förbättringarna av Amazon SageMaker SDK och introducera nya funktioner i Amazon SageMaker Pipelines som gör det lättare för ML-utövare att bygga och träna ML-modeller.

Pipelines fortsätter att förnya sin utvecklarupplevelse, och med dessa senaste utgåvor kan du nu använda tjänsten på ett mer anpassat sätt:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Uppdaterad dokumentation på PipelineVariable användning för estimator, processor, tuner, transformator och modellbasklasser, Amazon-modeller och rammodeller. Det kommer att komma ytterligare ändringar med nyare versioner av SDK för att stödja alla underklasser av estimatorer och processorer.
  • 2.90.0 - Tillgänglighet av ModelStep för integrerad modellresursskapande och registreringsuppgifter.
  • 2.88.2 - Tillgänglighet av PipelineSession för hanterad interaktion med SageMaker-enheter och resurser.
  • 2.88.2 – Underklasskompatibilitet för arbetsflöde pipeline jobb steg så att du kan bygga jobbabstraktioner och konfigurera och köra bearbetnings-, utbildnings-, transformerings- och tuningjobb som du skulle göra utan en pipeline.
  • 2.76.0 - Tillgänglighet av FailStep att villkorligt stoppa en pipeline med felstatus.

I det här inlägget går vi igenom ett arbetsflöde med hjälp av en exempeldatauppsättning med fokus på modellbyggande och implementering för att demonstrera hur man implementerar Pipelines nya funktioner. I slutet bör du ha tillräckligt med information för att framgångsrikt kunna använda dessa nyare funktioner och förenkla dina ML-arbetsbelastningar.

Funktioner översikt

Pipelines erbjuder följande nya funktioner:

  • Anteckning för rörledningsvariabel – Vissa metodparametrar accepterar flera inmatningstyper, inklusive PipelineVariables, och ytterligare dokumentation har lagts till för att klargöra var PipelineVariables stöds i både den senaste stabila versionen av SageMaker SDK-dokumentation och initsignaturen för funktionerna. Till exempel, i följande TensorFlow-estimator, visar nu init-signaturen det model_dir och image_uri stödja PipelineVariables, medan de andra parametrarna inte gör det. För mer information, se TensorFlow Estimator.
    • Innan:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Efter:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Pipeline session - PipelineSession är ett nytt koncept som introducerats för att skapa enhet över SageMaker SDK och introducerar lat initiering av pipelineresurserna (körningsanropen fångas upp men körs inte förrän pipelinen skapas och körs). De PipelineSession sammanhang ärver SageMakerSession och implementerar bekväma metoder för dig att interagera med andra SageMaker-enheter och resurser, såsom utbildningsjobb, slutpunkter och indatauppsättningar lagrade i Amazon enkel lagringstjänst (Amazon S3).
  • Underklasskompatibilitet med arbetssteg för arbetsflödespipeline – Du kan nu bygga jobbabstraktioner och konfigurera och köra bearbetnings-, utbildnings-, transformerings- och finjusteringsjobb som du skulle göra utan en pipeline.
    • Till exempel att skapa ett bearbetningssteg med SKLearnProcessor tidigare krävde följande:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Som vi ser i föregående kod, ProcessingStep behöver göra i princip samma förbearbetningslogik som .run, bara utan att starta API-anropet för att starta jobbet. Men med underklasskompatibilitet nu aktiverad med arbetsflödespipelinejobbssteg, förklarar vi att step_args argument som tar förbearbetningslogiken med .run så att du kan bygga en jobbabstraktion och konfigurera den som du skulle använda den utan Pipelines. Vi passerar också i pipeline_session, Som är en PipelineSession objekt istället för sagemaker_session för att se till att körningsanropen fångas upp men inte anropas förrän pipelinen skapas och körs. Se följande kod:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Modellsteg (en strömlinjeformad metod med modellskapande och registreringssteg) – Pipelines erbjuder två stegtyper att integrera med SageMaker-modeller: CreateModelStep och RegisterModel. Du kan nu uppnå båda genom att endast använda ModelStep typ. Observera att a PipelineSession krävs för att uppnå detta. Detta ger likheter mellan pipeline-stegen och SDK:n.
    • Innan:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Efter:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Felsteg (villkorligt stopp av pipelinekörningen) - FailStep tillåter att en pipeline stoppas med en felstatus om ett villkor är uppfyllt, till exempel om modellpoängen är under en viss tröskel.

Lösningsöversikt

I den här lösningen är din ingångspunkt Amazon SageMaker Studio integrerad utvecklingsmiljö (IDE) för snabba experiment. Studio erbjuder en miljö för att hantera hela Pipelines-upplevelsen. Med Studio kan du kringgå AWS Management Console för hela din arbetsflödeshantering. För mer information om att hantera pipelines inifrån Studio, se Visa, spåra och kör SageMaker Pipelines i SageMaker Studio.

Följande diagram illustrerar högnivåarkitekturen för ML-arbetsflödet med de olika stegen för att träna och generera slutsatser med de nya funktionerna.

Pipelinen innehåller följande steg:

  1. Förbearbeta data för att bygga funktioner som krävs och dela upp data i tåg-, validerings- och testdatauppsättningar.
  2. Skapa ett träningsjobb med SageMaker XGBoost-ramverket.
  3. Utvärdera den tränade modellen med hjälp av testdatauppsättningen.
  4. Kontrollera om AUC-poängen ligger över en fördefinierad tröskel.
    • Om AUC-poängen är lägre än tröskeln, stoppa pipelinekörningen och markera den som misslyckad.
    • Om AUC-poängen är högre än tröskeln, skapa en SageMaker-modell och registrera den i SageMaker-modellregistret.
  5. Tillämpa batchtransformation på den givna datamängden med den modell som skapades i föregående steg.

Förutsättningar

För att följa detta inlägg behöver du ett AWS-konto med ett Studiodomän.

Pipelines är integrerad direkt med SageMaker-enheter och resurser, så du behöver inte interagera med några andra AWS-tjänster. Du behöver inte heller hantera några resurser eftersom det är en helt hanterad tjänst, vilket innebär att den skapar och hanterar resurser åt dig. För mer information om de olika SageMaker-komponenterna som båda är fristående Python API:er tillsammans med integrerade komponenter i Studio, se SageMaker produktsida.

Innan du börjar, installera SageMaker SDK version >= 2.104.0 och xlrd >=1.0.0 i Studio-anteckningsboken med hjälp av följande kodavsnitt:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML arbetsflöde

För det här inlägget använder du följande komponenter:

  • Dataförberedelse
    • SageMaker-bearbetning – SageMaker Processing är en helt hanterad tjänst som låter dig köra anpassade datatransformationer och funktionsutveckling för ML-arbetsbelastningar.
  • Modellbyggnad
  • Modellutbildning och utvärdering
    • Träning med ett klick – Den distribuerade träningsfunktionen från SageMaker. SageMaker tillhandahåller distribuerade utbildningsbibliotek för dataparallellism och modellparallellism. Biblioteken är optimerade för SageMakers träningsmiljö, hjälper till att anpassa dina distribuerade träningsjobb till SageMaker och förbättrar träningshastighet och genomströmning.
    • SageMaker-experiment – Experiment är en funktion hos SageMaker som låter dig organisera, spåra, jämföra och utvärdera dina ML-iterationer.
    • SageMaker batch-transformation – Batchtransformering eller offlinepoäng är en hanterad tjänst i SageMaker som låter dig förutsäga en större datauppsättning med dina ML-modeller.
  • Arbetsflödesorkestrering

En SageMaker-pipeline är en serie sammankopplade steg som definieras av en JSON-pipelinedefinition. Den kodar en pipeline med hjälp av en riktad acyklisk graf (DAG). DAG ger information om kraven för och sambanden mellan varje steg i pipelinen, och dess struktur bestäms av databeroendet mellan stegen. Dessa beroenden skapas när egenskaperna för ett stegs utdata skickas som indata till ett annat steg.

Följande diagram illustrerar de olika stegen i SageMaker-pipelinen (för ett churn-förutsägande användningsfall) där kopplingarna mellan stegen härleds av SageMaker baserat på ingångarna och utgångarna som definieras av stegdefinitionerna.

Nästa avsnitt går igenom att skapa varje steg i pipelinen och köra hela pipelinen när den väl har skapats.

Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Projektets struktur

Låt oss börja med projektstrukturen:

  • /sm-pipelines-end-to-end-exempel – Projektets namn
    • / data – Datauppsättningarna
    • /pipelines – Kodfilerna för pipelinekomponenter
      • /kundtjänst
        • preprocess.py
        • evaluate.py
    • sagemaker-pipelines-project.ipynb – En anteckningsbok som går igenom modelleringsarbetsflödet med Pipelines nya funktioner

Ladda ner datasetet

För att följa med det här inlägget måste du ladda ner och spara exempeluppsättning under datamappen i projektets hemkatalog, som sparar filen i Amazon Elastic File System (Amazon EFS) inom Studiomiljön.

Bygg rörledningskomponenterna

Nu är du redo att bygga pipelinekomponenterna.

Importera satser och deklarera parametrar och konstanter

Skapa en Studio-anteckningsbok som heter sagemaker-pipelines-project.ipynb i projektets hemkatalog. Ange följande kodblock i en cell och kör cellen för att ställa in SageMaker- och S3-klientobjekt, skapa PipelineSession, och ställ in platsen för S3-hinken med standardhinken som följer med en SageMaker-session:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines stöder parameterisering, vilket gör att du kan ange indataparametrar vid körning utan att ändra din pipelinekod. Du kan använda modulerna som finns tillgängliga under sagemaker.workflow.parameters modul, som t.ex ParameterInteger, ParameterFloatoch ParameterString, för att specificera pipelineparametrar för olika datatyper. Kör följande kod för att ställa in flera ingångsparametrar:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generera en batchdatauppsättning

Generera batchdatasetet som du använder senare i batchtransformeringssteget:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Ladda upp data till en S3-bucket

Ladda upp datamängderna till Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definiera ett bearbetningsskript och bearbetningssteg

I det här steget förbereder du ett Python-skript för att göra funktionsteknik, en varmkodning och kurera utbildningen, valideringen och testdelarna som ska användas för modellbyggande. Kör följande kod för att bygga ditt bearbetningsskript:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Kör sedan följande kodblock för att instansiera processorn och steget Pipelines för att köra bearbetningsskriptet. Eftersom bearbetningsskriptet är skrivet i Pandas använder du en SKLearnProcessor. Rörledningarna ProcessingStep funktionen tar följande argument: processorn, indata S3-platserna för rådatauppsättningar och utdata S3-platserna för att spara bearbetade datauppsättningar.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definiera ett träningssteg

Ställ in modellträning med hjälp av en SageMaker XGBoost-estimator och Pipelines TrainingStep fungera:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definiera utvärderingsskriptet och modellutvärderingssteget

Kör följande kodblock för att utvärdera modellen när den väl har tränats. Detta skript kapslar in logiken för att kontrollera om AUC-poängen når den angivna tröskeln.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Kör sedan följande kodblock för att instansiera processorn och steget Pipelines för att köra utvärderingsskriptet. Eftersom utvärderingsskriptet använder XGBoost-paketet använder du en ScriptProcessor tillsammans med XGBoost-bilden. Rörledningarna ProcessingStep funktionen tar följande argument: processorn, indata S3-platserna för rådatauppsättningar och utdata S3-platserna för att spara bearbetade datauppsättningar.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definiera ett steg för att skapa modell

Kör följande kodblock för att skapa en SageMaker-modell med hjälp av modellsteget Pipelines. Det här steget använder resultatet från utbildningssteget för att paketera modellen för implementering. Observera att värdet för argumentet instanstyp skickas med hjälp av parametern Pipelines som du definierade tidigare i inlägget.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definiera ett batchtransformeringssteg

Kör följande kodblock för att köra batchtransformation med den tränade modellen med batchingången som skapades i det första steget:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definiera ett registermodellsteg

Följande kod registrerar modellen i SageMaker-modellregistret med hjälp av Pipelines-modellsteget:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definiera ett felsteg för att stoppa pipelinen

Följande kod definierar Pipelines fail-steget för att stoppa pipelinekörningen med ett felmeddelande om AUC-poängen inte når den definierade tröskeln:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Definiera ett villkorssteg för att kontrollera AUC-poäng

Följande kod definierar ett villkorssteg för att kontrollera AUC-poängen och villkorligt skapa en modell och köra en batchtransformation och registrera en modell i modellregistret, eller stoppa pipelinekörningen i ett misslyckat tillstånd:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Bygg och kör pipelinen

Efter att ha definierat alla komponentstegen kan du sätta ihop dem till ett Pipelines-objekt. Du behöver inte ange ordningen på pipeline eftersom Pipelines automatiskt härleder ordningsföljden baserat på beroenden mellan stegen.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Kör följande kod i en cell i din anteckningsbok. Om pipelinen redan finns uppdaterar koden pipelinen. Om pipelinen inte finns skapar den en ny.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Slutsats

I det här inlägget introducerade vi några av de nya funktionerna som nu är tillgängliga med Pipelines tillsammans med andra inbyggda SageMaker-funktioner och XGBoost-algoritmen för att utveckla, iterera och distribuera en modell för churn-förutsägelse. Lösningen kan utökas med ytterligare datakällor

att implementera ditt eget ML-arbetsflöde. För mer information om de steg som är tillgängliga i Pipelines arbetsflöde, se Amazon SageMaker Model Building Pipeline och SageMaker arbetsflöden. De AWS SageMaker exempel GitHub repo har fler exempel kring olika användningsfall som använder Pipelines.


Om författarna

Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Jerry Peng är en mjukvaruutvecklingsingenjör med AWS SageMaker. Han fokuserar på att bygga heltäckande storskaliga MLOps-system från utbildning till modellövervakning i produktionen. Han brinner också för att föra ut konceptet MLOps till en bredare publik.

Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Dewen Qi är en mjukvaruutvecklingsingenjör i AWS. Hon fokuserar för närvarande på att utveckla och förbättra SageMaker Pipelines. Utanför jobbet tycker hon om att träna cello.

Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Gayatri Ghanakota är en Sr. Machine Learning Engineer med AWS Professional Services. Hon brinner för att utveckla, distribuera och förklara AI/ML-lösningar över olika domäner. Före denna roll ledde hon flera initiativ som datavetare och ML-ingenjör med globala toppföretag inom finans- och detaljhandeln. Hon har en magisterexamen i datavetenskap specialiserad på datavetenskap från University of Colorado, Boulder.

Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Rupinder Grewal är en Sr Ai/ML Specialist Solutions Architect med AWS. Han fokuserar för närvarande på servering av modeller och MLOps på SageMaker. Innan denna roll har han arbetat som Machine Learning Engineer med att bygga och hosta modeller. Utanför jobbet tycker han om att spela tennis och cykla på bergsstigar.

Nya funktioner för Amazon SageMaker Pipelines och Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Ray Li är en Sr. Data Scientist med AWS Professional Services. Hans specialitet fokuserar på att bygga och operationalisera AI/ML-lösningar för kunder av varierande storlek, allt från nystartade företag till företagsorganisationer. Utanför jobbet tycker Ray om att träna och resa.

Tidsstämpel:

Mer från AWS maskininlärning