Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK

Canalizaciones de Amazon SageMaker permite a los científicos de datos y a los ingenieros de aprendizaje automático (ML) automatizar los flujos de trabajo de capacitación, lo que lo ayuda a crear un proceso repetible para orquestar los pasos de desarrollo de modelos para una experimentación rápida y una nueva capacitación de modelos. Puede automatizar todo el flujo de trabajo de creación de modelos, incluida la preparación de datos, la ingeniería de características, el entrenamiento de modelos, el ajuste de modelos y la validación de modelos, y catalogarlo en el registro de modelos. Puede configurar las canalizaciones para que se ejecuten automáticamente a intervalos regulares o cuando se activan ciertos eventos, o puede ejecutarlas manualmente según sea necesario.

En esta publicación, destacamos algunas de las mejoras en el Amazon SageMaker SDK e introduzca nuevas características de Amazon SageMaker Pipelines que facilitan a los profesionales de ML la creación y capacitación de modelos de ML.

Pipelines continúa innovando su experiencia de desarrollador y, con estos lanzamientos recientes, ahora puede usar el servicio de una manera más personalizada:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Documentación actualizada sobre PipelineVariable uso para las clases base de estimador, procesador, sintonizador, transformador y modelo, modelos de Amazon y modelos de marco. Habrá cambios adicionales con versiones más nuevas del SDK para admitir todas las subclases de estimadores y procesadores.
  • 2.90.0 - Disponibilidad de ModeloPaso para tareas integradas de creación y registro de recursos de modelos.
  • 2.88.2 - Disponibilidad de Sesión de canalización para la interacción administrada con entidades y recursos de SageMaker.
  • 2.88.2 – Compatibilidad de subclases para pasos de trabajo de canalización de flujo de trabajo para que pueda crear abstracciones de trabajos y configurar y ejecutar trabajos de procesamiento, capacitación, transformación y ajuste como lo haría sin una canalización.
  • 2.76.0 - Disponibilidad de paso fallido para detener condicionalmente una canalización con un estado de error.

En esta publicación, lo guiaremos a través de un flujo de trabajo utilizando un conjunto de datos de muestra con un enfoque en la creación e implementación de modelos para demostrar cómo implementar las nuevas funciones de Pipelines. Al final, debería tener suficiente información para usar con éxito estas funciones más nuevas y simplificar sus cargas de trabajo de ML.

Descripción de las características

Pipelines ofrece las siguientes características nuevas:

  • Anotación de variable de canalización – Ciertos parámetros de método aceptan múltiples tipos de entrada, incluidos PipelineVariables, y se ha agregado documentación adicional para aclarar dónde PipelineVariables son compatibles tanto con la última versión estable de la documentación del SDK de SageMaker como con la firma de inicio de las funciones. Por ejemplo, en el siguiente estimador de TensorFlow, la firma init ahora muestra que model_dir y image_uri SOPORTE PipelineVariables, mientras que los otros parámetros no lo hacen. Para obtener más información, consulte Estimador TensorFlow.
    • Antes de:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Después:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Sesión de canalizaciónSesión de canalización es un nuevo concepto introducido para traer unidad a través del SDK de SageMaker e introduce la inicialización diferida de los recursos de canalización (las llamadas de ejecución se capturan pero no se ejecutan hasta que se crea y ejecuta la canalización). los PipelineSession El contexto hereda el SageMakerSession e implementa métodos convenientes para que usted interactúe con otras entidades y recursos de SageMaker, como trabajos de capacitación, puntos finales y conjuntos de datos de entrada almacenados en Servicio de almacenamiento simple de Amazon (Amazon S3).
  • Compatibilidad de subclase con pasos de trabajo de canalización de flujo de trabajo – Ahora puede crear abstracciones de trabajos y configurar y ejecutar trabajos de procesamiento, capacitación, transformación y ajuste como lo haría sin una canalización.
    • Por ejemplo, crear un paso de procesamiento con SKLearnProcessor anteriormente requería lo siguiente:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Como vemos en el código anterior, ProcessingStep necesita hacer básicamente la misma lógica de preprocesamiento que .run, simplemente sin iniciar la llamada a la API para iniciar el trabajo. Pero con la compatibilidad de subclases ahora habilitada con los pasos de trabajo de canalización de flujo de trabajo, declaramos el step_args argumento que toma la lógica de preprocesamiento con .run para que pueda crear una abstracción de trabajo y configurarla como la usaría sin Pipelines. También pasamos en el pipeline_session, Que es un PipelineSession objeto, en lugar de sagemaker_session para asegurarse de que las llamadas de ejecución se capturen pero no se llamen hasta que se cree y ejecute la canalización. Ver el siguiente código:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Paso de modelo (un enfoque simplificado con creación de modelos y pasos de registro) –Pipelines ofrece dos tipos de pasos para integrarse con los modelos de SageMaker: CreateModelStep y RegisterModel. Ahora puede lograr ambos usando solo el ModelStep escribe. Tenga en cuenta que un PipelineSession se requiere para lograr esto. Esto brinda similitud entre los pasos de canalización y el SDK.
    • Antes de:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Después:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fail step (detención condicional de la ejecución de la canalización)FailStep permite que una canalización se detenga con un estado de falla si se cumple una condición, como si la puntuación del modelo está por debajo de cierto umbral.

Resumen de la solución

En esta solución, su punto de entrada es el Estudio Amazon SageMaker entorno de desarrollo integrado (IDE) para la experimentación rápida. Studio ofrece un entorno para administrar la experiencia de Pipelines de un extremo a otro. Con Studio, puede omitir la Consola de administración de AWS para toda la gestión de su flujo de trabajo. Para obtener más información sobre cómo administrar Pipelines desde Studio, consulte Visualización, seguimiento y ejecución de canalizaciones de SageMaker en SageMaker Studio.

El siguiente diagrama ilustra la arquitectura de alto nivel del flujo de trabajo de ML con los diferentes pasos para entrenar y generar inferencias usando las nuevas características.

La canalización incluye los siguientes pasos:

  1. Preprocesar los datos para crear las funciones requeridas y dividir los datos en conjuntos de datos de entrenamiento, validación y prueba.
  2. Cree un trabajo de entrenamiento con el marco SageMaker XGBoost.
  3. Evalúe el modelo entrenado utilizando el conjunto de datos de prueba.
  4. Compruebe si la puntuación AUC está por encima de un umbral predefinido.
    • Si la puntuación de AUC es inferior al umbral, detenga la ejecución de la canalización y márquela como fallida.
    • Si la puntuación AUC es superior al umbral, cree un modelo de SageMaker y regístrelo en el registro de modelos de SageMaker.
  5. Aplique la transformación por lotes en el conjunto de datos dado utilizando el modelo creado en el paso anterior.

Requisitos previos

Para seguir esta publicación, necesita una cuenta de AWS con un Dominio de estudio.

Pipelines se integra directamente con las entidades y los recursos de SageMaker, por lo que no necesita interactuar con ningún otro servicio de AWS. Tampoco necesita administrar ningún recurso porque es un servicio completamente administrado, lo que significa que crea y administra recursos por usted. Para obtener más información sobre los diversos componentes de SageMaker que son API de Python independientes junto con componentes integrados de Studio, consulte la Página de producto de SageMaker.

Antes de comenzar, instale la versión del SDK de SageMaker >= 2.104.0 y xlrd >=1.0.0 en el portátil de Studio con el siguiente fragmento de código:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Flujo de trabajo de aprendizaje automático

Para esta publicación, utiliza los siguientes componentes:

  • Preparación de datos
    • Procesamiento SageMaker – SageMaker Processing es un servicio completamente administrado que le permite ejecutar transformaciones de datos personalizadas e ingeniería de características para cargas de trabajo de ML.
  • Construcción del modelo
  • Capacitación y evaluación de modelos
    • Entrenamiento con un clic – La función de formación distribuida de SageMaker. SageMaker proporciona bibliotecas de capacitación distribuidas para el paralelismo de datos y el paralelismo de modelos. Las bibliotecas están optimizadas para el entorno de capacitación de SageMaker, ayudan a adaptar sus trabajos de capacitación distribuidos a SageMaker y mejoran la velocidad y el rendimiento de la capacitación.
    • Experimentos de SageMaker – Experimentos es una capacidad de SageMaker que le permite organizar, rastrear, comparar y evaluar sus iteraciones de ML.
    • Transformación por lotes de SageMaker – La transformación por lotes o la puntuación sin conexión es un servicio administrado en SageMaker que le permite predecir en un conjunto de datos más grande utilizando sus modelos de ML.
  • Orquestación del flujo de trabajo

Una canalización de SageMaker es una serie de pasos interconectados definidos por una definición de canalización JSON. Codifica una tubería utilizando un gráfico acíclico dirigido (DAG). El DAG brinda información sobre los requisitos y las relaciones entre cada paso de la canalización, y su estructura está determinada por las dependencias de datos entre los pasos. Estas dependencias se crean cuando las propiedades de la salida de un paso se pasan como entrada a otro paso.

El siguiente diagrama ilustra los diferentes pasos en la canalización de SageMaker (para un caso de uso de predicción de abandono) donde SageMaker infiere las conexiones entre los pasos en función de las entradas y salidas definidas por las definiciones de paso.

Las siguientes secciones explican la creación de cada paso de la canalización y la ejecución de la canalización completa una vez creada.

Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Estructura del proyecto

Comencemos con la estructura del proyecto:

  • /sm-pipelines-extremo-a-extremo-ejemplo – El nombre del proyecto
    • /datos – Los conjuntos de datos
    • / pipelines – Los archivos de código para los componentes de la canalización
      • /abandono de clientes
        • preproceso.py
        • evaluar.py
    • proyecto-pipelines-sagemaker.ipynb – Un cuaderno que recorre el flujo de trabajo de modelado utilizando las nuevas funciones de Pipelines

Descargar el conjunto de datos

Para seguir esta publicación, debe descargar y guardar el conjunto de datos de muestra en la carpeta de datos dentro del directorio de inicio del proyecto, que guarda el archivo en Sistema de archivos elástico de Amazon (Amazon EFS) dentro del entorno Studio.

Construya los componentes de la canalización

Ahora está listo para construir los componentes de canalización.

Importar sentencias y declarar parámetros y constantes

Cree un cuaderno de Studio llamado sagemaker-pipelines-project.ipynb dentro del directorio de inicio del proyecto. Ingrese el siguiente bloque de código en una celda y ejecute la celda para configurar SageMaker y objetos de cliente S3, cree PipelineSessiony configure la ubicación del depósito de S3 utilizando el depósito predeterminado que viene con una sesión de SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines admite la parametrización, lo que le permite especificar parámetros de entrada en tiempo de ejecución sin cambiar su código de canalización. Puede utilizar los módulos disponibles en el sagemaker.workflow.parameters módulo, como ParameterInteger, ParameterFloaty ParameterString, para especificar parámetros de canalización de varios tipos de datos. Ejecute el siguiente código para configurar varios parámetros de entrada:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Generar un conjunto de datos por lotes

Genere el conjunto de datos por lotes, que utilizará más adelante en el paso de transformación por lotes:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Subir datos a un depósito S3

Cargue los conjuntos de datos en Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definir un guión de procesamiento y un paso de procesamiento

En este paso, prepara un script de Python para realizar la ingeniería de funciones, una codificación en caliente y seleccionar las divisiones de entrenamiento, validación y prueba que se utilizarán para la creación de modelos. Ejecute el siguiente código para crear su secuencia de comandos de procesamiento:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

A continuación, ejecute el siguiente bloque de código para instanciar el procesador y el paso Pipelines para ejecutar el script de procesamiento. Debido a que el script de procesamiento está escrito en Pandas, usa un SKLearnProcesador. Los oleoductos ProcessingStep La función toma los siguientes argumentos: el procesador, las ubicaciones S3 de entrada para conjuntos de datos sin procesar y las ubicaciones S3 de salida para guardar conjuntos de datos procesados.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definir un paso de entrenamiento

Configure el entrenamiento de modelos con un estimador SageMaker XGBoost y Pipelines TrainingStep función:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definir el guión de evaluación y el paso de evaluación del modelo.

Ejecute el siguiente bloque de código para evaluar el modelo una vez entrenado. Este script encapsula la lógica para comprobar si la puntuación de AUC alcanza el umbral especificado.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

A continuación, ejecute el siguiente bloque de código para instanciar el procesador y el paso Pipelines para ejecutar el script de evaluación. Debido a que el script de evaluación usa el paquete XGBoost, usa un ScriptProcessor junto con la imagen XGBoost. Los oleoductos ProcessingStep La función toma los siguientes argumentos: el procesador, las ubicaciones S3 de entrada para conjuntos de datos sin procesar y las ubicaciones S3 de salida para guardar conjuntos de datos procesados.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definir un paso de creación de modelo

Ejecute el siguiente bloque de código para crear un modelo de SageMaker mediante el paso del modelo Pipelines. Este paso utiliza el resultado del paso de entrenamiento para empaquetar el modelo para su implementación. Tenga en cuenta que el valor del argumento de tipo de instancia se pasa mediante el parámetro Pipelines que definió anteriormente en la publicación.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definir un paso de transformación por lotes

Ejecute el siguiente bloque de código para ejecutar la transformación por lotes utilizando el modelo entrenado con la entrada por lotes creada en el primer paso:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definir un paso de modelo de registro

El siguiente código registra el modelo en el registro de modelos de SageMaker mediante el paso del modelo Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definir un paso fallido para detener la canalización

El siguiente código define el paso de error de las canalizaciones para detener la ejecución de la canalización con un mensaje de error si la puntuación AUC no alcanza el umbral definido:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Defina un paso de condición para comprobar la puntuación AUC

El siguiente código define un paso de condición para comprobar la puntuación AUC y crear condicionalmente un modelo y ejecutar una transformación por lotes y registrar un modelo en el registro del modelo, o detener la ejecución de la canalización en un estado fallido:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Construya y ejecute la canalización

Después de definir todos los pasos del componente, puede ensamblarlos en un objeto Pipelines. No es necesario que especifique el orden de canalización porque Pipelines infiere automáticamente la secuencia de orden en función de las dependencias entre los pasos.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Ejecute el siguiente código en una celda de su cuaderno. Si la canalización ya existe, el código actualiza la canalización. Si la tubería no existe, crea una nueva.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Conclusión

En esta publicación, presentamos algunas de las nuevas funciones ahora disponibles con Pipelines junto con otras funciones integradas de SageMaker y el algoritmo XGBoost para desarrollar, iterar e implementar un modelo para la predicción de rotación. La solución se puede ampliar con fuentes de datos adicionales

para implementar su propio flujo de trabajo de ML. Para obtener más detalles sobre los pasos disponibles en el flujo de trabajo de Pipelines, consulte Canalización de creación de modelos de Amazon SageMaker y Flujos de trabajo de SageMaker. Ejemplos de AWS SageMaker El repositorio de GitHub tiene más ejemplos sobre varios casos de uso usando Pipelines.


Acerca de los autores

Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.jerry peng es ingeniero de desarrollo de software en AWS SageMaker. Se centra en la construcción de un sistema MLOps a gran escala de extremo a extremo, desde la capacitación hasta el monitoreo de modelos en producción. También le apasiona llevar el concepto de MLOps a un público más amplio.

Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.Qi Dewen es ingeniero de desarrollo de software en AWS. Actualmente se enfoca en desarrollar y mejorar SageMaker Pipelines. Fuera del trabajo, le gusta practicar violonchelo.

Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.Gayatri Ghanakota es ingeniero sénior de aprendizaje automático en AWS Professional Services. Le apasiona desarrollar, implementar y explicar soluciones de IA/ML en varios dominios. Antes de este cargo, lideró múltiples iniciativas como científica de datos e ingeniera de ML con las principales firmas globales en el espacio financiero y minorista. Tiene una maestría en Ciencias de la Computación especializada en Ciencia de Datos de la Universidad de Colorado, Boulder.

Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.Rupinder Grewal es un Arquitecto de Soluciones Sr. Ai/ML Especialista con AWS. Actualmente se enfoca en servir modelos y MLOps en SageMaker. Antes de este cargo, trabajó como ingeniero de aprendizaje automático creando y alojando modelos. Fuera del trabajo, le gusta jugar tenis y andar en bicicleta por senderos de montaña.

Nuevas funciones para Amazon SageMaker Pipelines y Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.rayo li es un científico de datos sénior de los servicios profesionales de AWS. Su especialidad se centra en la creación y puesta en marcha de soluciones de IA/ML para clientes de distintos tamaños, desde empresas emergentes hasta organizaciones empresariales. Fuera del trabajo, a Ray le gusta hacer ejercicio y viajar.

Sello de tiempo:

Mas de Aprendizaje automático de AWS