Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK PlatoBlockchain Data Intelligence di Amazon SageMaker. Ricerca verticale. Ai.

Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK Amazon SageMaker

Pipeline di Amazon SageMaker consente ai data scientist e agli ingegneri del machine learning (ML) di automatizzare i flussi di lavoro di formazione, aiutandoti a creare un processo ripetibile per orchestrare le fasi di sviluppo del modello per una sperimentazione rapida e la riqualificazione del modello. Puoi automatizzare l'intero flusso di lavoro di creazione del modello, inclusa la preparazione dei dati, la progettazione delle funzionalità, l'addestramento del modello, l'ottimizzazione e la convalida del modello, nonché catalogarlo nel registro del modello. È possibile configurare le pipeline in modo che vengano eseguite automaticamente a intervalli regolari o quando vengono attivati ​​determinati eventi oppure è possibile eseguirle manualmente in base alle esigenze.

In questo post evidenziamo alcuni dei miglioramenti apportati a Amazon Sage Maker SDK e introducono nuove funzionalità di Amazon SageMaker Pipelines che semplificano la creazione e l'addestramento di modelli ML per i professionisti del machine learning.

Pipelines continua a innovare la propria esperienza di sviluppo e, con queste recenti versioni, ora puoi utilizzare il servizio in modo più personalizzato:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Documentazione aggiornata su PipelineVariable utilizzo per stimatore, processore, sintonizzatore, trasformatore e classi base del modello, modelli Amazon e modelli framework. Verranno apportate ulteriori modifiche alle versioni più recenti dell'SDK per supportare tutte le sottoclassi di stimatori e processori.
  • 2.90.0 - Disponibilità di ModelloStep per le attività di creazione e registrazione delle risorse del modello integrato.
  • 2.88.2 - Disponibilità di Pipeline Session per l'interazione gestita con entità e risorse SageMaker.
  • 2.88.2 – Compatibilità delle sottoclassi per fasi di lavoro della pipeline del flusso di lavoro in questo modo puoi creare astrazioni di processi e configurare ed eseguire processi di elaborazione, formazione, trasformazione e ottimizzazione come faresti senza una pipeline.
  • 2.76.0 - Disponibilità di FailStep per arrestare in modo condizionale una pipeline con uno stato di errore.

In questo post ti guideremo attraverso un flusso di lavoro utilizzando un set di dati di esempio con particolare attenzione alla creazione e alla distribuzione del modello per dimostrare come implementare le nuove funzionalità di Pipelines. Alla fine, dovresti disporre di informazioni sufficienti per utilizzare con successo queste funzionalità più recenti e semplificare i carichi di lavoro ML.

Panoramica delle funzionalità

Pipelines offre le seguenti nuove funzionalità:

  • Annotazione della variabile pipeline – Alcuni parametri del metodo accettano più tipi di input, inclusi PipelineVariablesed è stata aggiunta ulteriore documentazione per chiarire dove PipelineVariables sono supportati sia nell'ultima versione stabile della documentazione dell'SDK di SageMaker che nella firma init delle funzioni. Ad esempio, nel seguente stimatore TensorFlow, la firma init ora lo mostra model_dir ed image_uri supporto PipelineVariables, mentre gli altri parametri no. Per ulteriori informazioni, fare riferimento a Stimatore del flusso tensoriale.
    • Prima:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Dopo:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Sessione della pipeline - Pipeline Session è un nuovo concetto introdotto per portare unità nell'SDK di SageMaker e introduce l'inizializzazione lenta delle risorse della pipeline (le chiamate di esecuzione vengono acquisite ma non eseguite finché la pipeline non viene creata ed eseguita). IL PipelineSession contesto eredita il SageMakerSession e implementa metodi convenienti per interagire con altre entità e risorse SageMaker, come processi di formazione, endpoint e set di dati di input archiviati in Servizio di archiviazione semplice Amazon (Amazon S3).
  • Compatibilità delle sottoclassi con le fasi di lavoro della pipeline del flusso di lavoro – Ora puoi creare astrazioni di lavoro e configurare ed eseguire lavori di elaborazione, formazione, trasformazione e ottimizzazione come faresti senza una pipeline.
    • Ad esempio, creando una fase di elaborazione con SKLearnProcessor precedentemente richiesto quanto segue:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Come vediamo nel codice precedente, ProcessingStep deve eseguire fondamentalmente la stessa logica di preelaborazione di .run, semplicemente senza avviare la chiamata API per avviare il lavoro. Ma con la compatibilità delle sottoclassi ora abilitata con le fasi di lavoro della pipeline del flusso di lavoro, dichiariamo il file step_args argomento che accetta la logica di preelaborazione con .run in modo da poter creare un'astrazione del lavoro e configurarla come la utilizzeresti senza Pipelines. Passiamo anche nel pipeline_session, Che è un PipelineSession oggetto, invece di sagemaker_session per assicurarsi che le chiamate di esecuzione vengano acquisite ma non chiamate finché la pipeline non viene creata ed eseguita. Vedere il seguente codice:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Fase del modello (un approccio semplificato con fasi di creazione e registrazione del modello) –Pipelines offre due tipi di passaggi per l'integrazione con i modelli SageMaker: CreateModelStep ed RegisterModel. Ora puoi ottenere entrambi utilizzando solo il file ModelStep tipo. Si noti che a PipelineSession è necessario per raggiungere questo obiettivo. Ciò comporta somiglianze tra i passaggi della pipeline e l'SDK.
    • Prima:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Dopo:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fail step (arresto condizionale della corsa della pipeline) - FailStep consente di arrestare una pipeline con uno stato di errore se viene soddisfatta una condizione, ad esempio se il punteggio del modello è inferiore a una determinata soglia.

Panoramica della soluzione

In questa soluzione, il punto di ingresso è il Amazon Sage Maker Studio ambiente di sviluppo integrato (IDE) per una sperimentazione rapida. Studio offre un ambiente per gestire l'esperienza Pipelines end-to-end. Con Studio puoi ignorare il file Console di gestione AWS per la gestione dell’intero flusso di lavoro. Per ulteriori informazioni sulla gestione delle pipeline da Studio, fare riferimento a Visualizza, traccia ed esegui pipeline SageMaker in SageMaker Studio.

Il diagramma seguente illustra l'architettura di alto livello del flusso di lavoro ML con i diversi passaggi per addestrare e generare inferenze utilizzando le nuove funzionalità.

La pipeline comprende i seguenti passaggi:

  1. Preelabora i dati per creare le funzionalità richieste e suddividi i dati in set di dati di training, convalida e test.
  2. Crea un lavoro di formazione con il framework SageMaker XGBoost.
  3. Valutare il modello addestrato utilizzando il set di dati di test.
  4. Controlla se il punteggio AUC è superiore a una soglia predefinita.
    • Se il punteggio AUC è inferiore alla soglia, interrompere l'esecuzione della pipeline e contrassegnarla come non riuscita.
    • Se il punteggio AUC è maggiore della soglia, crea un modello SageMaker e registralo nel registro dei modelli SageMaker.
  5. Applicare la trasformazione batch al set di dati specificato utilizzando il modello creato nel passaggio precedente.

Prerequisiti

Per seguire questo post, è necessario un account AWS con a Dominio Studio.

Pipelines è integrato direttamente con entità e risorse SageMaker, quindi non è necessario interagire con altri servizi AWS. Inoltre, non è necessario gestire alcuna risorsa perché è un servizio completamente gestito, il che significa che crea e gestisce le risorse per te. Per ulteriori informazioni sui vari componenti SageMaker che sono sia API Python autonome sia componenti integrati di Studio, consultare la Pagina del prodotto SageMaker.

Prima di iniziare, installa SageMaker SDK versione >= 2.104.0 e xlrd >=1.0.0 nel notebook Studio utilizzando il seguente snippet di codice:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Flusso di lavoro di machine learning

Per questo post, utilizzerai i seguenti componenti:

  • Preparazione dei dati
    • Elaborazione di SageMaker – SageMaker Processing è un servizio completamente gestito che ti consente di eseguire trasformazioni di dati personalizzate e ingegneria di funzionalità per carichi di lavoro ML.
  • Modellismo
  • Formazione e valutazione del modello
    • Formazione con un clic – La funzionalità di formazione distribuita di SageMaker. SageMaker fornisce librerie di training distribuite per il parallelismo dei dati e il parallelismo dei modelli. Le librerie sono ottimizzate per l'ambiente di formazione SageMaker, aiutano ad adattare i lavori di formazione distribuiti a SageMaker e migliorano la velocità e la produttività della formazione.
    • Esperimenti di SageMaker – Experiments è una funzionalità di SageMaker che ti consente di organizzare, monitorare, confrontare e valutare le tue iterazioni ML.
    • Trasformazione batch SageMaker – La trasformazione batch o il punteggio offline è un servizio gestito in SageMaker che ti consente di effettuare previsioni su un set di dati più grande utilizzando i tuoi modelli ML.
  • Orchestrazione del flusso di lavoro

Una pipeline SageMaker è una serie di passaggi interconnessi definiti da una definizione di pipeline JSON. Codifica una pipeline utilizzando un grafico aciclico diretto (DAG). Il DAG fornisce informazioni sui requisiti e sulle relazioni tra ciascuna fase della pipeline e la sua struttura è determinata dalle dipendenze dei dati tra le fasi. Queste dipendenze vengono create quando le proprietà dell'output di un passaggio vengono passate come input a un altro passaggio.

Il diagramma seguente illustra i diversi passaggi nella pipeline SageMaker (per un caso d'uso di previsione del tasso di abbandono) in cui le connessioni tra i passaggi vengono dedotte da SageMaker in base agli input e agli output definiti dalle definizioni dei passaggi.

Le sezioni successive illustrano la creazione di ogni passaggio della pipeline e l'esecuzione dell'intera pipeline una volta creata.

Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK PlatoBlockchain Data Intelligence di Amazon SageMaker. Ricerca verticale. Ai.

Struttura del progetto

Partiamo dalla struttura del progetto:

  • /sm-pipelines-end-to-end-esempio – Il nome del progetto
    • /dati – I set di dati
    • /condutture – I file di codice per i componenti della pipeline
      • /fidelizzazione dei clienti
        • preprocess.py
        • valutare.py
    • sagemaker-pipelines-project.ipynb – Un notebook che illustra il flusso di lavoro di modellazione utilizzando le nuove funzionalità di Pipelines

Scarica il dataset

Per seguire questo post, devi scaricare e salvare il set di dati di esempio nella cartella dati all'interno della directory home del progetto, che salva il file in File system elastico Amazon (Amazon EFS) all'interno dell'ambiente Studio.

Costruisci i componenti della pipeline

Ora sei pronto per creare i componenti della pipeline.

Importa istruzioni e dichiara parametri e costanti

Crea un taccuino di Studio chiamato sagemaker-pipelines-project.ipynb all'interno della directory home del progetto. Immettere il seguente blocco di codice in una cella ed eseguire la cella per configurare gli oggetti client SageMaker e S3, creare PipelineSessione configura la posizione del bucket S3 utilizzando il bucket predefinito fornito con una sessione SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Le pipeline supportano la parametrizzazione, che consente di specificare i parametri di input in fase di esecuzione senza modificare il codice della pipeline. È possibile utilizzare i moduli disponibili sotto il sagemaker.workflow.parameters modulo, come ParameterInteger, ParameterFloate ParameterString, per specificare i parametri della pipeline di vari tipi di dati. Eseguire il codice seguente per impostare più parametri di input:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Genera un set di dati batch

Genera il set di dati batch, che utilizzerai successivamente nella fase di trasformazione batch:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Carica i dati in un bucket S3

Carica i set di dati su Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definire uno script di elaborazione e una fase di elaborazione

In questa fase, prepari uno script Python per eseguire l'ingegneria delle funzionalità, una codifica a caldo e curare la formazione, la convalida e le suddivisioni dei test da utilizzare per la creazione del modello. Esegui il codice seguente per creare lo script di elaborazione:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Successivamente, esegui il seguente blocco di codice per creare un'istanza del processore e il passaggio Pipelines per eseguire lo script di elaborazione. Poiché lo script di elaborazione è scritto in Panda, usi a SKLearn Processor. Gli oleodotti ProcessingStep La funzione accetta i seguenti argomenti: il processore, le posizioni S3 di input per i set di dati grezzi e le posizioni S3 di output per salvare i set di dati elaborati.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definire una fase di formazione

Configura l'addestramento del modello utilizzando uno stimatore SageMaker XGBoost e le pipeline TrainingStep funzione:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definire lo script di valutazione e la fase di valutazione del modello

Esegui il seguente blocco di codice per valutare il modello una volta addestrato. Questo script incapsula la logica per verificare se il punteggio AUC soddisfa la soglia specificata.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Successivamente, esegui il seguente blocco di codice per creare un'istanza del processore e il passaggio Pipelines per eseguire lo script di valutazione. Poiché lo script di valutazione utilizza il pacchetto XGBoost, usi a ScriptProcessor insieme all'immagine XGBoost. Le condutture ProcessingStep La funzione accetta i seguenti argomenti: il processore, le posizioni S3 di input per i set di dati grezzi e le posizioni S3 di output per salvare i set di dati elaborati.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definire una fase di creazione del modello

Esegui il seguente blocco di codice per creare un modello SageMaker utilizzando il passaggio del modello Pipelines. Questo passaggio utilizza l'output del passaggio di training per creare un pacchetto del modello per la distribuzione. Tieni presente che il valore per l'argomento del tipo di istanza viene passato utilizzando il parametro Pipelines definito in precedenza nel post.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definire una fase di trasformazione batch

Esegui il seguente blocco di codice per eseguire la trasformazione batch utilizzando il modello addestrato con l'input batch creato nel primo passaggio:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definire un passaggio del modello di registro

Il codice seguente registra il modello nel registro dei modelli SageMaker utilizzando il passaggio del modello Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definire un passaggio di errore per arrestare la pipeline

Il codice seguente definisce il passaggio Pipelines fail per interrompere l'esecuzione della pipeline con un messaggio di errore se il punteggio AUC non soddisfa la soglia definita:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Definire un passaggio condizionale per verificare il punteggio AUC

Il codice seguente definisce un passaggio di condizione per verificare il punteggio AUC e creare in modo condizionale un modello ed eseguire una trasformazione batch e registrare un modello nel registro del modello o interrompere l'esecuzione della pipeline in uno stato non riuscito:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Costruisci ed esegui la pipeline

Dopo aver definito tutti i passaggi del componente, puoi assemblarli in un oggetto Pipelines. Non è necessario specificare l'ordine della pipeline perché Pipelines deduce automaticamente la sequenza dell'ordine in base alle dipendenze tra i passaggi.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Esegui il seguente codice in una cella del tuo notebook. Se la pipeline esiste già, il codice aggiorna la pipeline. Se la pipeline non esiste, ne crea una nuova.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Conclusione

In questo post, abbiamo introdotto alcune delle nuove funzionalità ora disponibili con Pipelines insieme ad altre funzionalità integrate di SageMaker e all'algoritmo XGBoost per sviluppare, iterare e distribuire un modello per la previsione del tasso di abbandono. La soluzione può essere estesa con ulteriori origini dati

per implementare il tuo flusso di lavoro ML. Per ulteriori dettagli sui passaggi disponibili nel flusso di lavoro Pipeline, fare riferimento a Pipeline di modellismo Amazon SageMaker ed Flussi di lavoro di SageMaker. Esempi di AWS SageMaker Il repository GitHub contiene più esempi di vari casi d'uso che utilizzano Pipelines.


Informazioni sugli autori

Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK PlatoBlockchain Data Intelligence di Amazon SageMaker. Ricerca verticale. Ai.Jerry Peng è un ingegnere di sviluppo software con AWS SageMaker. Si concentra sulla creazione di sistemi MLOps end-to-end su larga scala, dalla formazione al monitoraggio dei modelli in produzione. La sua passione è anche portare il concetto di MLOps a un pubblico più ampio.

Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK PlatoBlockchain Data Intelligence di Amazon SageMaker. Ricerca verticale. Ai.Dewen Qi è un ingegnere di sviluppo software in AWS. Attualmente si concentra sullo sviluppo e sul miglioramento delle pipeline SageMaker. Al di fuori del lavoro, le piace esercitarsi nel violoncello.

Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK PlatoBlockchain Data Intelligence di Amazon SageMaker. Ricerca verticale. Ai.Gayatri Ghanakota è un ingegnere esperto di machine learning con AWS Professional Services. È appassionata di sviluppo, distribuzione e spiegazione di soluzioni AI/ML in vari domini. Prima di ricoprire questo ruolo, ha guidato numerose iniziative come data scientist e ingegnere ML con le migliori aziende globali nel settore finanziario e retail. Ha conseguito un master in Informatica con specializzazione in Data Science presso l'Università del Colorado, Boulder.

Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK PlatoBlockchain Data Intelligence di Amazon SageMaker. Ricerca verticale. Ai.Rupinder Grewal è un Senior Specialist Solutions Architect di Ai/ML con AWS. Attualmente si concentra sulla pubblicazione di modelli e MLOp su SageMaker. Prima di questo ruolo ha lavorato come Machine Learning Engineer, costruendo e ospitando modelli. Al di fuori del lavoro gli piace giocare a tennis e andare in bicicletta sui sentieri di montagna.

Nuove funzionalità per le pipeline Amazon SageMaker e l'SDK PlatoBlockchain Data Intelligence di Amazon SageMaker. Ricerca verticale. Ai.Ray Li è un Data Scientist senior con AWS Professional Services. La sua specialità si concentra sulla creazione e l'operatività di soluzioni AI/ML per clienti di varie dimensioni, dalle startup alle organizzazioni aziendali. Al di fuori del lavoro, Ray ama il fitness e viaggiare.

Timestamp:

Di più da Apprendimento automatico di AWS