Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Căutare verticală. Ai.

Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK

Pipelines Amazon SageMaker permite oamenilor de știință de date și inginerilor de învățare automată (ML) să automatizeze fluxurile de lucru de formare, ceea ce vă ajută să creați un proces repetabil pentru a orchestra pașii de dezvoltare a modelului pentru experimentare rapidă și reinstruire a modelului. Puteți automatiza întregul flux de lucru pentru construirea modelului, inclusiv pregătirea datelor, ingineria caracteristicilor, antrenamentul modelului, reglarea modelului și validarea modelului și îl puteți cataloga în registrul de modele. Puteți configura conductele să ruleze automat la intervale regulate sau când anumite evenimente sunt declanșate sau le puteți rula manual după cum este necesar.

În această postare, evidențiem câteva dintre îmbunătățirile aduse Amazon SageMaker SDK și introduceți noi funcții ale Amazon SageMaker Pipelines care facilitează construirea și instruirea modelelor ML pentru practicanții ML.

Pipelines continuă să-și inoveze experiența de dezvoltator și, cu aceste versiuni recente, acum puteți utiliza serviciul într-un mod mai personalizat:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Documentație actualizată pe PipelineVariable utilizarea pentru estimator, procesor, tuner, transformator și clase de bază de model, modele Amazon și modele cadru. Vor exista modificări suplimentare cu versiunile mai noi ale SDK-ului pentru a accepta toate subclasele de estimatori și procesoare.
  • 2.90.0 - Disponibilitatea ModelStep pentru sarcini integrate de creare și înregistrare a resurselor modelului.
  • 2.88.2 - Disponibilitatea PipelineSession pentru interacțiunea gestionată cu entitățile și resursele SageMaker.
  • 2.88.2 – Compatibilitate cu subclase pentru pașii de lucru ale fluxului de lucru astfel încât să puteți construi abstracții de job și să configurați și să rulați joburi de procesare, antrenare, transformare și reglare așa cum ați face fără o conductă.
  • 2.76.0 - Disponibilitatea FailStep pentru a opri condiționat o conductă cu o stare de defecțiune.

În această postare, vă prezentăm un flux de lucru folosind un exemplu de set de date cu accent pe construirea și implementarea modelului pentru a demonstra cum să implementați noile caracteristici ale Pipelines. Până la sfârșit, ar trebui să aveți suficiente informații pentru a utiliza cu succes aceste funcții mai noi și pentru a vă simplifica sarcinile de lucru ML.

Prezentare generală a caracteristicilor

Pipelines oferă următoarele caracteristici noi:

  • Adnotare variabilă pipeline – Anumiți parametri ai metodei acceptă mai multe tipuri de intrare, inclusiv PipelineVariables, iar documentația suplimentară a fost adăugată pentru a clarifica unde PipelineVariables sunt acceptate atât în ​​cea mai recentă versiune stabilă a documentației SageMaker SDK, cât și în semnătura inițială a funcțiilor. De exemplu, în următorul estimator TensorFlow, semnătura init arată acum asta model_dir și image_uri a sustine PipelineVariables, în timp ce ceilalți parametri nu. Pentru mai multe informații, consultați Estimator TensorFlow.
    • Inainte de:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • După:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Sesiune pipeline - PipelineSession este un nou concept introdus pentru a aduce unitate în SDK-ul SageMaker și introduce inițializarea leneșă a resurselor pipelinei (apelurile de rulare sunt capturate, dar nu rulează până când pipeline este creat și rulat). The PipelineSession contextul moștenește SageMakerSession și implementează metode convenabile pentru a interacționa cu alte entități și resurse SageMaker, cum ar fi joburi de instruire, puncte finale și seturi de date de intrare stocate în Serviciul Amazon de stocare simplă (Amazon S3).
  • Compatibilitate subclasă cu pașii de lucru a fluxului de lucru – Acum puteți construi abstracții de job și puteți configura și rula joburi de procesare, antrenare, transformare și reglare așa cum ați face fără o conductă.
    • De exemplu, crearea unui pas de procesare cu SKLearnProcessor anterior solicitau următoarele:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • După cum vedem în codul precedent, ProcessingStep trebuie să facă practic aceeași logică de preprocesare ca .run, doar fără a iniția apelul API pentru a porni jobul. Dar, cu compatibilitatea cu subclasele activată acum cu pașii de lucru a fluxului de lucru, declarăm că step_args argument care preia logica de preprocesare cu .run, astfel încât să puteți construi o abstractizare a jobului și să o configurați așa cum ați folosi-o fără Pipelines. Trecem și în pipeline_session, Care este o PipelineSession obiect, în loc de sagemaker_session pentru a vă asigura că apelurile de rulare sunt capturate, dar nu sunt apelate până când conducta este creată și rulată. Vezi următorul cod:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Etapa modelului (o abordare simplificată cu pași de creare și înregistrare a modelului) –Pipelines oferă două tipuri de pași pentru a se integra cu modelele SageMaker: CreateModelStep și RegisterModel. Acum le puteți realiza pe ambele folosind numai ModelStep tip. Rețineți că a PipelineSession este necesar pentru a realiza acest lucru. Acest lucru aduce similitudini între pașii canalului și SDK-ul.
    • Inainte de:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • După:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Pas de eșec (oprirea condiționată a rulării conductei) - FailStep permite oprirea unei conducte cu o stare de eșec dacă o condiție este îndeplinită, cum ar fi dacă scorul modelului este sub un anumit prag.

Prezentare generală a soluțiilor

În această soluție, punctul dvs. de intrare este Amazon SageMaker Studio mediu de dezvoltare integrat (IDE) pentru experimentare rapidă. Studio oferă un mediu de gestionare a experienței pipelines end-to-end. Cu Studio, puteți ocoli Consola de administrare AWS pentru gestionarea întregului flux de lucru. Pentru mai multe informații despre gestionarea conductelor din Studio, consultați Vizualizați, urmăriți și executați conductele SageMaker în SageMaker Studio.

Următoarea diagramă ilustrează arhitectura de nivel înalt a fluxului de lucru ML cu diferiții pași pentru antrenament și generare de inferențe folosind noile caracteristici.

Conducta include următorii pași:

  1. Preprocesați datele pentru a construi caracteristicile necesare și împărțiți datele în seturi de date de tren, validare și testare.
  2. Creați un job de formare cu cadrul SageMaker XGBoost.
  3. Evaluați modelul antrenat folosind setul de date de testare.
  4. Verificați dacă scorul AUC este peste un prag predefinit.
    • Dacă scorul AUC este mai mic decât pragul, opriți rularea conductei și marcați-l ca eșuat.
    • Dacă scorul AUC este mai mare decât pragul, creați un model SageMaker și înregistrați-l în registrul de modele SageMaker.
  5. Aplicați transformarea lot pe setul de date dat folosind modelul creat în pasul anterior.

Cerințe preliminare

Pentru a urma această postare, aveți nevoie de un cont AWS cu a Domeniul studio.

Pipelines este integrat direct cu entitățile și resursele SageMaker, astfel încât nu trebuie să interacționați cu alte servicii AWS. De asemenea, nu trebuie să gestionați resurse, deoarece este un serviciu complet gestionat, ceea ce înseamnă că creează și gestionează resurse pentru dvs. Pentru mai multe informații despre diferitele componente SageMaker, care sunt atât API-uri Python autonome, cât și componente integrate ale Studio, consultați Pagina de produs SageMaker.

Înainte de a începe, instalați versiunea SDK SageMaker >= 2.104.0 și xlrd >=1.0.0 în blocnotesul Studio folosind următorul fragment de cod:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Fluxul de lucru ML

Pentru această postare, utilizați următoarele componente:

  • Pregătirea datelor
    • Procesare SageMaker – SageMaker Processing este un serviciu complet gestionat care vă permite să executați transformări personalizate de date și inginerie de caracteristici pentru sarcinile de lucru ML.
  • Construirea modelului
  • Model de instruire și evaluare
    • Antrenament cu un singur clic – Funcția de instruire distribuită SageMaker. SageMaker oferă biblioteci de instruire distribuite pentru paralelismul datelor și paralelismul modelelor. Bibliotecile sunt optimizate pentru mediul de instruire SageMaker, vă ajută să vă adaptați joburile de instruire distribuite la SageMaker și să îmbunătățească viteza și debitul de antrenament.
    • Experimentele SageMaker – Experiments este o capacitate a SageMaker care vă permite să organizați, urmăriți, comparați și evaluați iterațiile ML.
    • Transformarea lotului SageMaker – Transformarea lotului sau scoring offline este un serviciu gestionat în SageMaker care vă permite să preziceți pe un set de date mai mare utilizând modelele dvs. ML.
  • Orchestrarea fluxului de lucru

O conductă SageMaker este o serie de pași interconectați definiți printr-o definiție a conductei JSON. Acesta codifică o conductă folosind un grafic aciclic direcționat (DAG). DAG oferă informații despre cerințele și relațiile dintre fiecare pas al conductei, iar structura sa este determinată de dependențele de date dintre pași. Aceste dependențe sunt create atunci când proprietățile ieșirii unui pas sunt transmise ca intrare către alt pas.

Următoarea diagramă ilustrează diferiții pași din pipeline SageMaker (pentru un caz de utilizare a predicției de pierdere) în care conexiunile dintre pași sunt deduse de SageMaker pe baza intrărilor și ieșirilor definite de definițiile pașilor.

Următoarele secțiuni parcurg crearea fiecărui pas al conductei și rularea întregii conducte odată creată.

Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Căutare verticală. Ai.

Structura proiectului

Să începem cu structura proiectului:

  • /sm-pipelines-end-to-end-example – Numele proiectului
    • /date – Seturile de date
    • /conducte – Fișierele de cod pentru componentele conductei
      • /customerchurn
        • preprocesare.py
        • evalua.py
    • sagemaker-pipelines-project.ipynb – Un notebook care parcurge fluxul de lucru de modelare folosind noile caracteristici ale Pipelines

Descărcați setul de date

Pentru a urma această postare, trebuie să descărcați și să salvați set de date eșantion sub folderul de date din directorul principal al proiectului, care salvează fișierul în Sistem de fișiere elastice Amazon (Amazon EFS) în mediul Studio.

Construiți componentele conductei

Acum sunteți gata să construiți componentele conductei.

Importați instrucțiuni și declarați parametri și constante

Creați un blocnotes Studio numit sagemaker-pipelines-project.ipynb în directorul principal al proiectului. Introduceți următorul bloc de cod într-o celulă și rulați celula pentru a configura obiectele client SageMaker și S3, creați PipelineSessionși configurați locația compartimentului S3 utilizând compartimentul implicit care vine cu o sesiune SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines acceptă parametrizarea, ceea ce vă permite să specificați parametrii de intrare în timpul execuției fără a modifica codul conductei. Puteți utiliza modulele disponibile sub sagemaker.workflow.parameters modul, cum ar fi ParameterInteger, ParameterFloat, și ParameterString, pentru a specifica parametrii conductei de diferite tipuri de date. Rulați următorul cod pentru a configura mai mulți parametri de intrare:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generați un set de date lot

Generați setul de date lot, pe care îl utilizați mai târziu în pasul de transformare batch:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Încărcați date într-un compartiment S3

Încărcați seturile de date pe Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definiți un script de procesare și un pas de procesare

În acest pas, pregătiți un script Python pentru a face inginerie de caracteristici, o codificare fierbinte și organizați secțiunile de instruire, validare și testare care vor fi utilizate pentru construirea modelului. Rulați următorul cod pentru a vă construi scriptul de procesare:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Apoi, rulați următorul bloc de cod pentru a instanția procesorul și pasul Pipelines pentru a rula scriptul de procesare. Deoarece scriptul de procesare este scris în Pandas, utilizați a SKLearnProcessor. Conductele ProcessingStep funcția ia următoarele argumente: procesorul, locațiile S3 de intrare pentru seturile de date brute și locațiile S3 de ieșire pentru a salva seturile de date procesate.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definiți un pas de antrenament

Configurați antrenamentul modelului folosind un estimator SageMaker XGBoost și Pipelines TrainingStep funcţie:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definiți scenariul de evaluare și etapa de evaluare a modelului

Rulați următorul bloc de cod pentru a evalua modelul odată antrenat. Acest script încapsulează logica pentru a verifica dacă scorul AUC îndeplinește pragul specificat.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Apoi, rulați următorul bloc de cod pentru a instanția procesorul și pasul Pipelines pentru a rula scriptul de evaluare. Deoarece scriptul de evaluare folosește pachetul XGBoost, utilizați a ScriptProcessor împreună cu imaginea XGBoost. Conductele ProcessingStep funcția ia următoarele argumente: procesorul, locațiile S3 de intrare pentru seturile de date brute și locațiile S3 de ieșire pentru a salva seturile de date procesate.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definiți un pas de creare a modelului

Rulați următorul bloc de cod pentru a crea un model SageMaker utilizând pasul model Pipelines. Acest pas utilizează rezultatul pasului de instruire pentru a împacheta modelul pentru implementare. Rețineți că valoarea pentru argumentul tip instanță este transmisă utilizând parametrul Pipelines pe care l-ați definit mai devreme în postare.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definiți o etapă de transformare în lot

Rulați următorul bloc de cod pentru a rula transformarea lotului folosind modelul antrenat cu intrarea lot creată în primul pas:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definiți un pas de model de registru

Următorul cod înregistrează modelul în registrul modelului SageMaker utilizând pasul model Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definiți un pas de eșec pentru a opri conducta

Următorul cod definește pasul de eșuare a conductelor pentru a opri rularea conductei cu un mesaj de eroare dacă scorul AUC nu atinge pragul definit:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Definiți un pas de condiție pentru a verifica scorul AUC

Următorul cod definește un pas de condiție pentru a verifica scorul AUC și a crea condiționat un model și a rula o transformare în lot și a înregistra un model în registrul modelului sau pentru a opri rularea conductei într-o stare eșuată:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Construiți și rulați conducta

După definirea tuturor pașilor componentelor, le puteți asambla într-un obiect Pipelines. Nu trebuie să specificați ordinea conductei, deoarece Pipelines deduce automat secvența comenzii pe baza dependențelor dintre pași.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Rulați următorul cod într-o celulă din blocnotes. Dacă conducta există deja, codul actualizează conducta. Dacă conducta nu există, se creează una nouă.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Concluzie

În această postare, am introdus câteva dintre noile funcții disponibile acum cu Pipelines, împreună cu alte funcții încorporate SageMaker și algoritmul XGBoost pentru a dezvolta, repeta și implementa un model pentru predicția abandonului. Soluția poate fi extinsă cu surse de date suplimentare

pentru a implementa propriul flux de lucru ML. Pentru mai multe detalii despre pașii disponibili în fluxul de lucru Pipelines, consultați Amazon SageMaker Model Building Pipeline și Fluxuri de lucru SageMaker. Exemple AWS SageMaker GitHub repo are mai multe exemple despre diverse cazuri de utilizare folosind Pipelines.


Despre Autori

Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Căutare verticală. Ai.Jerry Peng este inginer de dezvoltare software cu AWS SageMaker. El se concentrează pe construirea unui sistem MLOps la scară largă, de la formare până la monitorizarea modelului în producție. De asemenea, este pasionat de a aduce conceptul MLOps unui public mai larg.

Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Căutare verticală. Ai.Dewen Qi este inginer de dezvoltare software în AWS. În prezent, se concentrează pe dezvoltarea și îmbunătățirea SageMaker Pipelines. În afara serviciului, îi place să practice violoncelul.

Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Căutare verticală. Ai.Gayatri Ghanakota este inginer Sr. Machine Learning cu AWS Professional Services. Este pasionată de dezvoltarea, implementarea și explicarea soluțiilor AI/ML în diferite domenii. Înainte de acest rol, ea a condus mai multe inițiative ca cercetător de date și inginer ML cu firme globale de top din spațiul financiar și de retail. Ea deține o diplomă de master în Informatică specializată în Știința datelor de la Universitatea din Colorado, Boulder.

Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Căutare verticală. Ai.Rupinder Grewal este un arhitect specializat în soluții Sr Ai/ML cu AWS. În prezent, se concentrează pe servirea modelelor și a MLOps-ului pe SageMaker. Înainte de acest rol, a lucrat ca inginer de învățare automată, construind și găzduind modele. În afara serviciului, îi place să joace tenis și să meargă cu bicicleta pe traseele montane.

Noi funcții pentru Amazon SageMaker Pipelines și Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Căutare verticală. Ai.Ray Li este un Data Scientist Sr. cu AWS Professional Services. Specialitatea sa se concentrează pe construirea și operaționalizarea soluțiilor AI/ML pentru clienți de diferite dimensiuni, de la startup-uri la organizații enterprise. În afara serviciului, lui Ray îi place fitness-ul și călătoriile.

Timestamp-ul:

Mai mult de la Învățare automată AWS