Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK

Amazon SageMaker Pipelines giver datavidenskabsfolk og maskinlæringsingeniører (ML) mulighed for at automatisere træningsarbejdsgange, hvilket hjælper dig med at skabe en gentagelig proces til at orkestrere modeludviklingstrin til hurtig eksperimentering og modelgenoplæring. Du kan automatisere hele workflowet for modelopbygning, herunder dataforberedelse, funktionsudvikling, modeltræning, modeljustering og modelvalidering, og katalogisere det i modelregistret. Du kan konfigurere pipelines til at køre automatisk med regelmæssige intervaller, eller når bestemte hændelser udløses, eller du kan køre dem manuelt efter behov.

I dette indlæg fremhæver vi nogle af forbedringerne til Amazon SageMaker SDK og introducere nye funktioner i Amazon SageMaker Pipelines, der gør det nemmere for ML-udøvere at bygge og træne ML-modeller.

Pipelines fortsætter med at innovere sin udvikleroplevelse, og med disse seneste udgivelser kan du nu bruge tjenesten på en mere tilpasset måde:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Opdateret dokumentation vedr PipelineVariable brug for estimator, processor, tuner, transformer og modelbasisklasser, Amazon-modeller og rammemodeller. Der vil komme yderligere ændringer med nyere versioner af SDK for at understøtte alle underklasser af estimatorer og processorer.
  • 2.90.0 – Tilgængelighed af ModelStep til integrerede modelressourceoprettelse og registreringsopgaver.
  • 2.88.2 – Tilgængelighed af PipelineSession til styret interaktion med SageMaker-enheder og ressourcer.
  • 2.88.2 – Underklasse kompatibilitet til workflow pipeline job trin så du kan bygge jobabstraktioner og konfigurere og køre behandlings-, trænings-, transformations- og tuningjob, som du ville gøre uden en pipeline.
  • 2.76.0 – Tilgængelighed af FailStep at betinget stoppe en rørledning med en fejlstatus.

I dette indlæg fører vi dig gennem en arbejdsgang ved hjælp af et eksempeldatasæt med fokus på modelopbygning og implementering for at demonstrere, hvordan man implementerer Pipelines nye funktioner. Til sidst skulle du have nok information til at bruge disse nyere funktioner og forenkle dine ML-arbejdsbelastninger.

Funktioner oversigt

Pipelines tilbyder følgende nye funktioner:

  • Pipeline variabel annotation – Visse metodeparametre accepterer flere inputtyper, herunder PipelineVariables, og yderligere dokumentation er tilføjet for at afklare hvor PipelineVariables understøttes i både den seneste stabile version af SageMaker SDK-dokumentation og funktionernes init-signatur. For eksempel i den følgende TensorFlow-estimator viser init-signaturen det nu model_dir , image_uri support PipelineVariables, hvorimod de andre parametre ikke gør det. For mere information, se TensorFlow Estimator.
    • Før:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Efter:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Pipeline session - PipelineSession er et nyt koncept introduceret for at bringe enhed på tværs af SageMaker SDK og introducerer doven initialisering af pipelineressourcerne (kørselsopkaldene fanges, men køres ikke, før pipelinen er oprettet og kørt). Det PipelineSession kontekst arver SageMakerSession og implementerer praktiske metoder, så du kan interagere med andre SageMaker-enheder og ressourcer, såsom træningsjob, slutpunkter og inputdatasæt gemt i Amazon Simple Storage Service (Amazon S3).
  • Underklasse-kompatibilitet med workflow-pipeline-jobtrin – Du kan nu bygge jobabstraktioner og konfigurere og køre behandlings-, trænings-, transformations- og tuningjob, som du ville gøre uden en pipeline.
    • For eksempel oprettelse af et behandlingstrin med SKLearnProcessor tidligere krævet følgende:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Som vi ser i den foregående kode, ProcessingStep skal stort set lave den samme forbehandlingslogik som .run, bare uden at starte API-kaldet for at starte jobbet. Men med underklasse-kompatibilitet nu aktiveret med workflow-pipeline-jobtrin, erklærer vi step_args argument, der tager forbehandlingslogikken med .run, så du kan bygge en jobabstraktion og konfigurere den, som du ville bruge den uden Pipelines. Vi passerer også i pipeline_session, Som er en PipelineSession objekt i stedet for sagemaker_session for at sikre, at kørekaldene fanges, men ikke kaldes, før pipelinen er oprettet og kørt. Se følgende kode:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Modeltrin (en strømlinet tilgang med modeloprettelse og registreringstrin) –Rørledninger tilbyder to trintyper til at integrere med SageMaker-modeller: CreateModelStep , RegisterModel. Du kan nu opnå begge dele ved kun at bruge ModelStep type. Bemærk at a PipelineSession er påkrævet for at opnå dette. Dette giver lighed mellem pipeline-trinene og SDK'et.
    • Før:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Efter:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fejltrin (betinget stop af rørledningen) - FailStep tillader en pipeline at blive stoppet med en fejlstatus, hvis en betingelse er opfyldt, såsom hvis modelscore er under en vis tærskel.

Løsningsoversigt

I denne løsning er dit indgangspunkt Amazon SageMaker Studio integreret udviklingsmiljø (IDE) til hurtige eksperimenter. Studio tilbyder et miljø til at styre end-to-end Pipelines-oplevelsen. Med Studio kan du omgå AWS Management Console til hele din workflow-styring. For mere information om håndtering af pipelines fra Studio, se Se, spor og udfør SageMaker Pipelines i SageMaker Studio.

Følgende diagram illustrerer højniveauarkitekturen i ML-arbejdsgangen med de forskellige trin til at træne og generere slutninger ved hjælp af de nye funktioner.

Rørledningen omfatter følgende trin:

  1. Forbehandle data for at opbygge nødvendige funktioner og opdele data i tog-, validerings- og testdatasæt.
  2. Opret et træningsjob med SageMaker XGBoost-rammeværket.
  3. Evaluer den trænede model ved hjælp af testdatasættet.
  4. Tjek, om AUC-scoren er over en foruddefineret tærskel.
    • Hvis AUC-scoren er mindre end tærsklen, skal du stoppe pipelinekørslen og markere den som mislykket.
    • Hvis AUC-scoren er større end tærsklen, skal du oprette en SageMaker-model og registrere den i SageMaker-modelregistret.
  5. Anvend batchtransformation på det givne datasæt ved hjælp af modellen oprettet i det foregående trin.

Forudsætninger

For at følge med i dette indlæg skal du have en AWS-konto med en Studio domæne.

Pipelines er integreret direkte med SageMaker-enheder og ressourcer, så du ikke behøver at interagere med andre AWS-tjenester. Du behøver heller ikke at administrere nogen ressourcer, fordi det er en fuldt administreret tjeneste, hvilket betyder, at den opretter og administrerer ressourcer for dig. For mere information om de forskellige SageMaker-komponenter, der begge er selvstændige Python API'er sammen med integrerede komponenter i Studio, se SageMaker produktside.

Inden du går i gang, skal du installere SageMaker SDK version >= 2.104.0 og xlrd >=1.0.0 i Studio-notesbogen ved hjælp af følgende kodestykke:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML arbejdsgang

Til dette indlæg bruger du følgende komponenter:

  • Forberedelse af data
    • SageMaker-behandling – SageMaker Processing er en fuldt administreret tjeneste, der giver dig mulighed for at køre tilpassede datatransformationer og funktionsudvikling til ML-arbejdsbelastninger.
  • Modelbygning
  • Modeltræning og evaluering
    • Et-klik træning – SageMaker-distribuerede træningsfunktion. SageMaker leverer distribuerede træningsbiblioteker for dataparallelisme og modelparallelisme. Bibliotekerne er optimeret til SageMaker træningsmiljøet, hjælper med at tilpasse dine distribuerede træningsjob til SageMaker og forbedre træningshastighed og gennemstrømning.
    • SageMaker-eksperimenter – Eksperimenter er en funktion i SageMaker, der lader dig organisere, spore, sammenligne og evaluere dine ML-iterationer.
    • SageMaker batch transformation – Batchtransformation eller offline-scoring er en administreret tjeneste i SageMaker, der lader dig forudsige et større datasæt ved hjælp af dine ML-modeller.
  • Workflow orkestrering

En SageMaker-pipeline er en række indbyrdes forbundne trin defineret af en JSON-pipelinedefinition. Den koder for en pipeline ved hjælp af en rettet acyklisk graf (DAG). DAG'en giver information om kravene til og relationerne mellem hvert trin i pipelinen, og dens struktur bestemmes af dataafhængighederne mellem trinene. Disse afhængigheder oprettes, når egenskaberne for et trins output overføres som input til et andet trin.

Følgende diagram illustrerer de forskellige trin i SageMaker-pipelinen (til et tilfælde af churn-forudsigelse), hvor forbindelserne mellem trinnene udledes af SageMaker baseret på input og output defineret af trindefinitionerne.

De næste afsnit gennemgår oprettelse af hvert trin i pipelinen og kørsel af hele pipelinen, når den er oprettet.

Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Projektets struktur

Lad os starte med projektstrukturen:

  • /sm-pipelines-end-to-end-eksempel – Projektnavnet
    • /data – Datasættene
    • /rørledninger – Kodefilerne til pipeline-komponenter
      • /kundekreds
        • preprocess.py
        • evaluate.py
    • sagemaker-pipelines-project.ipynb – En notesbog, der går gennem modelleringsarbejdsgangen ved hjælp af Pipelines nye funktioner

Download datasættet

For at følge med i dette indlæg skal du downloade og gemme eksempeldatasæt under datamappen i projektets hjemmemappe, som gemmer filen i Amazon Elastic File System (Amazon EFS) i Studio-miljøet.

Byg rørledningskomponenterne

Nu er du klar til at bygge rørledningskomponenterne.

Importer udsagn og erklær parametre og konstanter

Opret en Studio-notesbog kaldet sagemaker-pipelines-project.ipynb i projektets hjemmemappe. Indtast følgende kodeblok i en celle, og kør cellen for at opsætte SageMaker- og S3-klientobjekter, opret PipelineSession, og konfigurer S3-bøtteplaceringen ved hjælp af standardbøtten, der følger med en SageMaker-session:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines understøtter parametrisering, som giver dig mulighed for at specificere inputparametre under kørsel uden at ændre din pipelinekode. Du kan bruge de tilgængelige moduler under sagemaker.workflow.parameters modul, som f.eks ParameterInteger, ParameterFloatog ParameterString, for at specificere pipeline-parametre for forskellige datatyper. Kør følgende kode for at konfigurere flere inputparametre:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generer et batch-datasæt

Generer batchdatasættet, som du bruger senere i batchtransformationstrinnet:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Upload data til en S3-bøtte

Upload datasættene til Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definer et behandlingsscript og et behandlingstrin

I dette trin forbereder du et Python-script til at udføre feature engineering, én hot-encoding og kuratere de trænings-, validerings- og testopdelinger, der skal bruges til modelbygning. Kør følgende kode for at bygge dit behandlingsscript:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Kør derefter følgende kodeblok for at instansiere processoren og Pipelines-trinnet for at køre behandlingsscriptet. Fordi behandlingsscriptet er skrevet i Pandas, bruger du en SKLearnProcessor. Rørledningerne ProcessingStep funktion tager følgende argumenter: processoren, input-S3-placeringerne for rå datasæt og output-S3-placeringerne for at gemme behandlede datasæt.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definer et træningstrin

Konfigurer modeltræning ved hjælp af en SageMaker XGBoost-estimator og Pipelines TrainingStep fungere:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definer evalueringsscriptet og modelevalueringstrinnet

Kør følgende kodeblok for at evaluere modellen, når den er trænet. Dette script indkapsler logikken for at kontrollere, om AUC-scoren når den angivne tærskel.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Kør derefter følgende kodeblok for at instansiere processoren og Pipelines-trinnet for at køre evalueringsscriptet. Fordi evalueringsscriptet bruger XGBoost-pakken, bruger du en ScriptProcessor sammen med XGBoost-billedet. Rørledningerne ProcessingStep funktion tager følgende argumenter: processoren, input-S3-placeringerne for rå datasæt og output-S3-placeringerne for at gemme behandlede datasæt.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definer et oprette modeltrin

Kør følgende kodeblok for at oprette en SageMaker-model ved hjælp af Pipelines-modeltrinnet. Dette trin udnytter outputtet fra træningstrinnet til at pakke modellen til implementering. Bemærk, at værdien for instanstypeargumentet videregives ved hjælp af den Pipelines-parameter, du definerede tidligere i indlægget.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definer et batchtransformationstrin

Kør følgende kodeblok for at køre batchtransformation ved hjælp af den trænede model med batchinput oprettet i det første trin:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definer et registermodeltrin

Følgende kode registrerer modellen i SageMaker-modelregistret ved hjælp af Pipelines-modeltrinnet:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definer et fejltrin for at stoppe pipelinen

Følgende kode definerer Pipelines fail-trinnet for at stoppe pipeline-kørslen med en fejlmeddelelse, hvis AUC-scoren ikke når den definerede tærskel:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Definer et betingelsestrin for at kontrollere AUC-score

Følgende kode definerer et betingelsestrin til at kontrollere AUC-scoren og betinget oprette en model og køre en batchtransformation og registrere en model i modelregistret eller stoppe pipelinekørslen i en mislykket tilstand:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Byg og kør rørledningen

Efter at have defineret alle komponenttrinene, kan du samle dem til et Pipelines-objekt. Du behøver ikke angive rækkefølgen af ​​pipeline, fordi Pipelines automatisk udleder rækkefølgen baseret på afhængighederne mellem trinnene.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Kør følgende kode i en celle i din notesbog. Hvis pipelinen allerede eksisterer, opdaterer koden pipelinen. Hvis pipelinen ikke eksisterer, opretter den en ny.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Konklusion

I dette indlæg introducerede vi nogle af de nye funktioner, der nu er tilgængelige med Pipelines, sammen med andre indbyggede SageMaker-funktioner og XGBoost-algoritmen til at udvikle, iterere og implementere en model til churn-forudsigelse. Løsningen kan udvides med yderligere datakilder

at implementere dit eget ML-workflow. For flere detaljer om de tilgængelige trin i Pipelines workflow, se Amazon SageMaker Model Building Pipeline , SageMaker arbejdsgange. Det Eksempler på AWS SageMaker GitHub repo har flere eksempler omkring forskellige use cases ved hjælp af Pipelines.


Om forfatterne

Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Jerry Peng er softwareudviklingsingeniør hos AWS SageMaker. Han fokuserer på at bygge ende-til-ende store MLOps-systemer fra træning til modelovervågning i produktionen. Han brænder også for at bringe konceptet MLOps til et bredere publikum.

Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Dewen Qi er softwareudviklingsingeniør i AWS. Hun fokuserer i øjeblikket på at udvikle og forbedre SageMaker Pipelines. Uden for arbejdet nyder hun at øve sig på cello.

Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Gayatri Ghanakota er en Sr. Machine Learning Engineer med AWS Professional Services. Hun brænder for at udvikle, implementere og forklare AI/ML-løsninger på tværs af forskellige domæner. Forud for denne rolle ledede hun flere initiativer som dataforsker og ML-ingeniør hos top globale firmaer inden for finans- og detailområdet. Hun har en mastergrad i datalogi med speciale i datavidenskab fra University of Colorado, Boulder.

Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Rupinder Grewal er en Sr Ai/ML Specialist Solutions Architect hos AWS. Han fokuserer i øjeblikket på servering af modeller og MLOps på SageMaker. Før denne rolle har han arbejdet som Machine Learning Engineer ved at bygge og hoste modeller. Uden for arbejdet nyder han at spille tennis og cykle på bjergstier.

Nye funktioner til Amazon SageMaker Pipelines og Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Ray Li er Sr. Data Scientist med AWS Professional Services. Hans speciale fokuserer på at bygge og operationalisere AI/ML-løsninger til kunder af varierende størrelse, lige fra startups til virksomhedsorganisationer. Uden for arbejdet nyder Ray fitness og rejser.

Tidsstempel:

Mere fra AWS maskinindlæring