Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK

Amazon SageMaker-rørledninger lar dataforskere og maskinlæringsingeniører (ML) automatisere treningsarbeidsflyter, noe som hjelper deg med å lage en repeterbar prosess for å orkestrere modellutviklingstrinn for rask eksperimentering og modellomskolering. Du kan automatisere hele arbeidsflyten for modellbygging, inkludert dataforberedelse, funksjonsutvikling, modellopplæring, modelljustering og modellvalidering, og katalogisere den i modellregisteret. Du kan konfigurere rørledninger til å kjøre automatisk med jevne mellomrom eller når visse hendelser utløses, eller du kan kjøre dem manuelt etter behov.

I dette innlegget fremhever vi noen av forbedringene til Amazon SageMaker SDK og introduser nye funksjoner i Amazon SageMaker Pipelines som gjør det enklere for ML-utøvere å bygge og trene ML-modeller.

Pipelines fortsetter å innovere sin utvikleropplevelse, og med disse siste utgivelsene kan du nå bruke tjenesten på en mer tilpasset måte:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Oppdatert dokumentasjon vedr PipelineVariable bruk for estimator-, prosessor-, tuner-, transformator- og modellbasisklasser, Amazon-modeller og rammemodeller. Det vil komme ytterligere endringer med nyere versjoner av SDK for å støtte alle underklasser av estimatorer og prosessorer.
  • 2.90.0 – Tilgjengelighet av ModellTrinn for integrerte modellressursoppretting og registreringsoppgaver.
  • 2.88.2 – Tilgjengelighet av PipelineSession for administrert interaksjon med SageMaker-enheter og ressurser.
  • 2.88.2 – Underklassekompatibilitet for arbeidsflyt pipeline jobbtrinn slik at du kan bygge jobbabstraksjoner og konfigurere og kjøre prosesserings-, opplærings-, transformerings- og tuningjobber som du ville gjort uten en pipeline.
  • 2.76.0 – Tilgjengelighet av FailStep å betinget stoppe en rørledning med feilstatus.

I dette innlegget leder vi deg gjennom en arbeidsflyt ved å bruke et eksempeldatasett med fokus på modellbygging og distribusjon for å demonstrere hvordan du implementerer Pipelines sine nye funksjoner. På slutten bør du ha nok informasjon til å kunne bruke disse nyere funksjonene og forenkle ML-arbeidsbelastningene dine.

Funksjoner oversikt

Pipelines tilbyr følgende nye funksjoner:

  • Annotering for pipelinevariabel – Enkelte metodeparametere godtar flere inngangstyper, inkludert PipelineVariables, og ytterligere dokumentasjon er lagt til for å avklare hvor PipelineVariables støttes både i den siste stabile versjonen av SageMaker SDK-dokumentasjonen og initsignaturen til funksjonene. For eksempel, i følgende TensorFlow-estimator, viser init-signaturen nå det model_dir og image_uri støtte PipelineVariables, mens de andre parameterne ikke gjør det. For mer informasjon, se TensorFlow Estimator.
    • Før:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Etter:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Pipeline økt - PipelineSession er et nytt konsept introdusert for å bringe enhet på tvers av SageMaker SDK og introduserer lat initialisering av rørledningsressursene (kjøreanropene fanges opp, men kjøres ikke før rørledningen er opprettet og kjørt). De PipelineSession konteksten arver SageMakerSession og implementerer praktiske metoder for å samhandle med andre SageMaker-enheter og ressurser, for eksempel opplæringsjobber, endepunkter og inndatasett lagret i Amazon enkel lagringstjeneste (Amazon S3).
  • Underklasse kompatibilitet med arbeidsflytpipeline jobbtrinn – Du kan nå bygge jobbabstraksjoner og konfigurere og kjøre prosesserings-, opplærings-, transformasjons- og tuningjobber som du ville gjort uten en pipeline.
    • For eksempel å lage et behandlingstrinn med SKLearnProcessor tidligere krevde følgende:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Som vi ser i den foregående koden, ProcessingStep trenger å gjøre stort sett samme forbehandlingslogikk som .run, bare uten å starte API-kallet for å starte jobben. Men med underklassekompatibilitet nå aktivert med arbeidsflytpipeline jobbtrinn, erklærer vi at step_args argument som tar forbehandlingslogikken med .run slik at du kan bygge en jobbabstraksjon og konfigurere den slik du ville brukt den uten Pipelines. Vi passerer også i pipeline_session, Som er en PipelineSession objekt, i stedet for sagemaker_session for å sørge for at kjøresamtalene fanges opp, men ikke kalles før rørledningen er opprettet og kjørt. Se følgende kode:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Modelltrinn (en strømlinjeformet tilnærming med modelloppretting og registreringstrinn) –Rørledninger tilbyr to trinnstyper å integrere med SageMaker-modeller: CreateModelStep og RegisterModel. Du kan nå oppnå begge deler kun ved å bruke ModelStep type. Merk at a PipelineSession kreves for å oppnå dette. Dette gir likhet mellom pipeline-trinnene og SDK.
    • Før:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Etter:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Feiltrinn (betinget stopp av rørledningen) - FailStep lar en rørledning stoppes med en feilstatus hvis en betingelse er oppfylt, for eksempel hvis modellpoengsummen er under en viss terskel.

Løsningsoversikt

I denne løsningen er inngangspunktet ditt Amazon SageMaker Studio integrert utviklingsmiljø (IDE) for rask eksperimentering. Studio tilbyr et miljø for å administrere ende-til-ende Pipelines-opplevelsen. Med Studio kan du omgå AWS-administrasjonskonsoll for hele arbeidsflyten din. For mer informasjon om administrasjon av pipelines fra Studio, se Se, spor og utfør SageMaker-rørledninger i SageMaker Studio.

Følgende diagram illustrerer høynivåarkitekturen til ML-arbeidsflyten med de forskjellige trinnene for å trene og generere slutninger ved å bruke de nye funksjonene.

Rørledningen inkluderer følgende trinn:

  1. Forbehandle data for å bygge nødvendige funksjoner og dele data inn i tog-, validerings- og testdatasett.
  2. Lag en treningsjobb med SageMaker XGBoost-rammeverket.
  3. Evaluer den trente modellen ved å bruke testdatasettet.
  4. Sjekk om AUC-poengsummen er over en forhåndsdefinert terskel.
    • Hvis AUC-poengsummen er mindre enn terskelen, stopper du rørledningen og merker den som mislykket.
    • Hvis AUC-poengsummen er høyere enn terskelen, oppretter du en SageMaker-modell og registrerer den i SageMaker-modellregisteret.
  5. Bruk batchtransformasjon på det gitte datasettet ved å bruke modellen opprettet i forrige trinn.

Forutsetninger

For å følge med på dette innlegget trenger du en AWS-konto med en Studio-domene.

Pipelines er integrert direkte med SageMaker-enheter og ressurser, slik at du ikke trenger å samhandle med andre AWS-tjenester. Du trenger heller ikke å administrere noen ressurser fordi det er en fullstendig administrert tjeneste, noe som betyr at den oppretter og administrerer ressurser for deg. For mer informasjon om de forskjellige SageMaker-komponentene som begge er frittstående Python APIer sammen med integrerte komponenter i Studio, se SageMaker produktside.

Før du begynner, installer SageMaker SDK versjon >= 2.104.0 og xlrd >=1.0.0 i Studio-notatboken ved å bruke følgende kodebit:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML arbeidsflyt

For dette innlegget bruker du følgende komponenter:

  • Dataforberedelse
    • SageMaker-behandling – SageMaker Processing er en fullstendig administrert tjeneste som lar deg kjøre tilpassede datatransformasjoner og funksjonsutvikling for ML-arbeidsbelastninger.
  • Modellbygg
  • Modelltrening og evaluering
    • Ett-klikks trening – Den distribuerte treningsfunksjonen fra SageMaker. SageMaker tilbyr distribuerte opplæringsbiblioteker for dataparallellisme og modellparallellisme. Bibliotekene er optimalisert for SageMaker treningsmiljø, hjelper til med å tilpasse de distribuerte treningsjobbene dine til SageMaker, og forbedrer treningshastigheten og gjennomstrømningen.
    • SageMaker eksperimenter – Eksperimenter er en funksjon i SageMaker som lar deg organisere, spore, sammenligne og evaluere ML-iterasjonene dine.
    • SageMaker batch-transformasjon – Batch-transformasjon eller offline-scoring er en administrert tjeneste i SageMaker som lar deg forutsi et større datasett ved å bruke ML-modellene dine.
  • Arbeidsflyt orkestrering

En SageMaker-rørledning er en serie sammenkoblede trinn definert av en JSON-rørledningsdefinisjon. Den koder for en rørledning ved hjelp av en rettet asyklisk graf (DAG). DAG gir informasjon om kravene til og relasjonene mellom hvert trinn i rørledningen, og strukturen bestemmes av dataavhengighetene mellom trinnene. Disse avhengighetene opprettes når egenskapene til et trinns utdata overføres som input til et annet trinn.

Følgende diagram illustrerer de forskjellige trinnene i SageMaker-pipelinen (for et brukstilfelle av churn-prediksjon) der forbindelsene mellom trinnene utledes av SageMaker basert på inngangene og utgangene definert av trinndefinisjonene.

De neste avsnittene går gjennom å lage hvert trinn i rørledningen og kjøre hele rørledningen når den er opprettet.

Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Prosjektstruktur

La oss starte med prosjektstrukturen:

  • /sm-pipelines-end-to-end-eksempel – Prosjektnavnet
    • / data – Datasettene
    • /rørledninger – Kodefilene for rørledningskomponenter
      • /kundemasse
        • preprocess.py
        • evaluate.py
    • sagemaker-pipelines-project.ipynb – En notatbok som går gjennom modelleringsarbeidsflyten med Pipelines nye funksjoner

Last ned datasettet

For å følge med på dette innlegget, må du laste ned og lagre eksempel datasett under datamappen i prosjektets hjemmekatalog, som lagrer filen i Amazon elastisk filsystem (Amazon EFS) i Studio-miljøet.

Bygg rørledningskomponentene

Nå er du klar til å bygge rørledningskomponentene.

Importer utsagn og deklarer parametere og konstanter

Lag en Studio-notatbok kalt sagemaker-pipelines-project.ipynb i prosjektets hjemmekatalog. Skriv inn følgende kodeblokk i en celle, og kjør cellen for å sette opp SageMaker- og S3-klientobjekter, opprett PipelineSession, og konfigurer S3-bøtteplasseringen ved å bruke standardbøtten som følger med en SageMaker-økt:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines støtter parameterisering, som lar deg spesifisere inngangsparametere under kjøring uten å endre pipeline-koden. Du kan bruke modulene som er tilgjengelige under sagemaker.workflow.parameters modul, som f.eks ParameterInteger, ParameterFloatog ParameterString, for å spesifisere rørledningsparametere for ulike datatyper. Kjør følgende kode for å sette opp flere inngangsparametere:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generer et batch-datasett

Generer batchdatasettet, som du bruker senere i batchtransformeringstrinnet:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Last opp data til en S3-bøtte

Last opp datasettene til Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definer et behandlingsskript og behandlingstrinn

I dette trinnet forbereder du et Python-skript for å utføre funksjonsutvikling, én varm koding, og kuratere opplæringen, valideringen og testdelingene som skal brukes til modellbygging. Kjør følgende kode for å bygge behandlingsskriptet ditt:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Deretter kjører du følgende kodeblokk for å instansiere prosessoren og Pipelines-trinnet for å kjøre behandlingsskriptet. Fordi behandlingsskriptet er skrevet i Pandas, bruker du en SKLearnProcessor. Rørledningene ProcessingStep funksjonen tar følgende argumenter: prosessoren, inndata S3-plasseringer for rådatasett og utdata S3-plasseringer for å lagre behandlede datasett.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definer et treningstrinn

Sett opp modelltrening ved å bruke en SageMaker XGBoost-estimator og Pipelines TrainingStep funksjon:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definer evalueringsskriptet og modellevalueringstrinnet

Kjør følgende kodeblokk for å evaluere modellen når den er trent. Dette skriptet innkapsler logikken for å sjekke om AUC-poengsummen oppfyller den angitte terskelen.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Deretter kjører du følgende kodeblokk for å instansiere prosessoren og Pipelines-trinnet for å kjøre evalueringsskriptet. Fordi evalueringsskriptet bruker XGBoost-pakken, bruker du en ScriptProcessor sammen med XGBoost-bildet. Rørledningene ProcessingStep funksjonen tar følgende argumenter: prosessoren, inndata S3-plasseringer for rådatasett og utdata S3-plasseringer for å lagre behandlede datasett.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definer et trinn for å lage modell

Kjør følgende kodeblokk for å lage en SageMaker-modell ved å bruke Pipelines-modelltrinnet. Dette trinnet utnytter resultatet fra opplæringstrinnet for å pakke modellen for distribusjon. Merk at verdien for instanstype-argumentet sendes ved å bruke Pipelines-parameteren du definerte tidligere i innlegget.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definer et batchtransformeringstrinn

Kjør følgende kodeblokk for å kjøre batchtransformasjon ved å bruke den opplærte modellen med batch-inndataene opprettet i det første trinnet:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definer et registermodelltrinn

Følgende kode registrerer modellen i SageMaker-modellregisteret ved å bruke Pipelines-modelltrinnet:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definer et feiltrinn for å stoppe rørledningen

Følgende kode definerer Pipelines fail-trinnet for å stoppe pipeline-kjøringen med en feilmelding hvis AUC-poengsummen ikke oppfyller den definerte terskelen:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Definer et betingelsestrinn for å sjekke AUC-poengsum

Følgende kode definerer et betingelsestrinn for å sjekke AUC-poengsummen og betinget opprette en modell og kjøre en batch-transformasjon og registrere en modell i modellregisteret, eller stoppe pipeline-kjøringen i en mislykket tilstand:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Bygg og kjør rørledningen

Etter å ha definert alle komponenttrinnene, kan du sette dem sammen til et Pipelines-objekt. Du trenger ikke spesifisere rekkefølgen på pipeline fordi Pipelines automatisk utleder rekkefølgen basert på avhengighetene mellom trinnene.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Kjør følgende kode i en celle i notatboken. Hvis rørledningen allerede eksisterer, oppdaterer koden rørledningen. Hvis rørledningen ikke eksisterer, opprettes en ny.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

konklusjonen

I dette innlegget introduserte vi noen av de nye funksjonene som nå er tilgjengelige med Pipelines sammen med andre innebygde SageMaker-funksjoner og XGBoost-algoritmen for å utvikle, iterere og distribuere en modell for churn-prediksjon. Løsningen kan utvides med ytterligere datakilder

å implementere din egen ML-arbeidsflyt. For mer informasjon om trinnene som er tilgjengelige i arbeidsflyten for rørledninger, se Amazon SageMaker Model Building Pipeline og SageMaker arbeidsflyter. De AWS SageMaker eksempler GitHub repo har flere eksempler rundt ulike brukstilfeller som bruker Pipelines.


Om forfatterne

Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Jerry Peng er en programvareutviklingsingeniør med AWS SageMaker. Han fokuserer på å bygge ende-til-ende storskala MLOps-system fra trening til modellovervåking i produksjon. Han brenner også for å bringe konseptet MLOps til et bredere publikum.

Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Dewen Qi er en programvareutviklingsingeniør i AWS. Hun fokuserer for tiden på å utvikle og forbedre SageMaker Pipelines. Utenom jobben liker hun å øve på cello.

Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Gayatri Ghanakota er en senior maskinlæringsingeniør med AWS Professional Services. Hun brenner for å utvikle, distribuere og forklare AI/ML-løsninger på tvers av ulike domener. Før denne rollen ledet hun flere initiativer som dataforsker og ML-ingeniør med globale toppfirmaer innen finans- og detaljhandel. Hun har en mastergrad i informatikk med spesialisering i datavitenskap fra University of Colorado, Boulder.

Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Rupinder Grewal er en Sr Ai/ML spesialistløsningsarkitekt med AWS. Han fokuserer for tiden på servering av modeller og MLOps på SageMaker. Før denne rollen har han jobbet som maskinlæringsingeniør med å bygge og hoste modeller. Utenom jobben liker han å spille tennis og sykle på fjellstier.

Nye funksjoner for Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Ray Li er en Sr. Data Scientist med AWS Professional Services. Hans spesialitet fokuserer på å bygge og operasjonalisere AI/ML-løsninger for kunder av varierende størrelse, alt fra startups til bedriftsorganisasjoner. Utenom jobben liker Ray trening og reise.

Tidstempel:

Mer fra AWS maskinlæring