Novos recursos para Amazon SageMaker Pipelines e Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Novos recursos para o Amazon SageMaker Pipelines e o Amazon SageMaker SDK

Pipelines Amazon SageMaker permite que cientistas de dados e engenheiros de machine learning (ML) automatizem fluxos de trabalho de treinamento, o que ajuda a criar um processo repetível para orquestrar etapas de desenvolvimento de modelo para experimentação rápida e retreinamento de modelo. Você pode automatizar todo o fluxo de trabalho de construção de modelo, incluindo preparação de dados, engenharia de recursos, treinamento de modelo, ajuste de modelo e validação de modelo, e catalogá-lo no registro de modelo. Você pode configurar pipelines para serem executados automaticamente em intervalos regulares ou quando determinados eventos são acionados, ou pode executá-los manualmente conforme necessário.

Neste post, destacamos algumas das melhorias no Amazon Sage Maker SDK e introduza novos recursos do Amazon SageMaker Pipelines que facilitam para os profissionais de ML criar e treinar modelos de ML.

A Pipelines continua inovando sua experiência de desenvolvedor e, com esses lançamentos recentes, agora você pode usar o serviço de forma mais personalizada:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Documentação atualizada sobre PipelineVariable uso para classes base de estimador, processador, sintonizador, transformador e modelo, modelos da Amazon e modelos de estrutura. Haverá alterações adicionais com versões mais recentes do SDK para oferecer suporte a todas as subclasses de estimadores e processadores.
  • 2.90.0 - Disponibilidade de ModeloEtapa para criação de recursos de modelo integrado e tarefas de registro.
  • 2.88.2 - Disponibilidade de Sessão de Pipeline para interação gerenciada com entidades e recursos do SageMaker.
  • 2.88.2 – Compatibilidade de subclasse para etapas de trabalho de pipeline de fluxo de trabalho para que você possa criar abstrações de trabalho e configurar e executar trabalhos de processamento, treinamento, transformação e ajuste como faria sem um pipeline.
  • 2.76.0 - Disponibilidade de FailStep para interromper condicionalmente um pipeline com status de falha.

Nesta postagem, orientamos você em um fluxo de trabalho usando um conjunto de dados de exemplo com foco na criação e implantação de modelos para demonstrar como implementar os novos recursos do Pipelines. No final, você deve ter informações suficientes para usar com êxito esses recursos mais recentes e simplificar suas cargas de trabalho de ML.

Visão geral dos recursos

O Pipelines oferece os seguintes novos recursos:

  • Anotação de variável de pipeline – Certos parâmetros de método aceitam vários tipos de entrada, incluindo PipelineVariables, e foi adicionada documentação adicional para esclarecer onde PipelineVariables são suportados na versão estável mais recente da documentação do SDK do SageMaker e na assinatura init das funções. Por exemplo, no estimador TensorFlow a seguir, a assinatura init agora mostra que model_dir e image_uri ajuda PipelineVariables, enquanto os outros parâmetros não. Para mais informações, consulte Estimador de TensorFlow.
    • Antes:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Depois:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Sessão de pipeline - Sessão de Pipeline é um novo conceito introduzido para trazer unidade ao SDK do SageMaker e introduz a inicialização lenta dos recursos do pipeline (as chamadas de execução são capturadas, mas não executadas até que o pipeline seja criado e executado). o PipelineSession contexto herda o SageMakerSession e implementa métodos convenientes para você interagir com outras entidades e recursos do SageMaker, como trabalhos de treinamento, endpoints e conjuntos de dados de entrada armazenados em Serviço de armazenamento simples da Amazon (Amazônia S3).
  • Compatibilidade de subclasse com etapas de trabalho de pipeline de fluxo de trabalho – Agora você pode criar abstrações de trabalho e configurar e executar trabalhos de processamento, treinamento, transformação e ajuste como faria sem um pipeline.
    • Por exemplo, criar uma etapa de processamento com SKLearnProcessor anteriormente exigia o seguinte:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Como vimos no código anterior, ProcessingStep precisa fazer basicamente a mesma lógica de pré-processamento que .run, apenas sem iniciar a chamada da API para iniciar o trabalho. Mas com a compatibilidade de subclasse agora habilitada com etapas de trabalho de pipeline de fluxo de trabalho, declaramos o step_args argumento que usa a lógica de pré-processamento com .run para que você possa construir uma abstração de trabalho e configurá-la como você a usaria sem Pipelines. Também passamos no pipeline_session, que é um PipelineSession objeto, em vez de sagemaker_session para garantir que as chamadas de execução sejam capturadas, mas não chamadas até que o pipeline seja criado e executado. Veja o seguinte código:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Etapa do modelo (uma abordagem simplificada com etapas de criação e registro do modelo) –O Pipelines oferece dois tipos de etapas para integração com os modelos SageMaker: CreateModelStep e RegisterModel. Agora você pode conseguir ambos usando apenas o ModelStep modelo. Observe que um PipelineSession é necessário para conseguir isso. Isso traz semelhança entre as etapas do pipeline e o SDK.
    • Antes:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Depois:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Etapa de falha (parada condicional da execução do pipeline) - FailStep permite que um pipeline seja interrompido com um status de falha se uma condição for atendida, como se a pontuação do modelo estiver abaixo de um determinado limite.

Visão geral da solução

Nesta solução, seu ponto de entrada é o Estúdio Amazon SageMaker ambiente de desenvolvimento integrado (IDE) para experimentação rápida. O Studio oferece um ambiente para gerenciar a experiência de Pipelines de ponta a ponta. Com o Studio, você pode ignorar o Console de gerenciamento da AWS para todo o seu gerenciamento de fluxo de trabalho. Para obter mais informações sobre o gerenciamento de pipelines no Studio, consulte Visualize, rastreie e execute pipelines do SageMaker no SageMaker Studio.

O diagrama a seguir ilustra a arquitetura de alto nível do fluxo de trabalho de ML com as diferentes etapas para treinar e gerar inferências usando os novos recursos.

O pipeline inclui as seguintes etapas:

  1. Pré-processe os dados para criar os recursos necessários e divida os dados em conjuntos de dados de treinamento, validação e teste.
  2. Crie um trabalho de treinamento com a estrutura SageMaker XGBoost.
  3. Avalie o modelo treinado usando o conjunto de dados de teste.
  4. Verifique se a pontuação AUC está acima de um limite predefinido.
    • Se a pontuação AUC for menor que o limite, interrompa a execução do pipeline e marque-a como com falha.
    • Se a pontuação AUC for maior que o limite, crie um modelo do SageMaker e registre-o no registro do modelo do SageMaker.
  5. Aplique a transformação em lote no conjunto de dados fornecido usando o modelo criado na etapa anterior.

Pré-requisitos

Para acompanhar este post, você precisa de uma conta AWS com um Domínio do estúdio.

O Pipelines é integrado diretamente às entidades e recursos do SageMaker, para que você não precise interagir com nenhum outro serviço da AWS. Você também não precisa gerenciar nenhum recurso porque é um serviço totalmente gerenciado, o que significa que ele cria e gerencia recursos para você. Para obter mais informações sobre os vários componentes do SageMaker que são APIs Python independentes e componentes integrados do Studio, consulte o página do produto SageMaker.

Antes de começar, instale a versão do SDK do SageMaker >= 2.104.0 e xlrd >=1.0.0 no notebook Studio usando o seguinte snippet de código:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Fluxo de trabalho de ML

Para este post, você usa os seguintes componentes:

  • Preparação de dados
    • Processamento SageMaker – O SageMaker Processing é um serviço totalmente gerenciado que permite executar transformações de dados personalizadas e engenharia de recursos para cargas de trabalho de ML.
  • Construção de modelo
  • Treinamento e avaliação do modelo
    • Treinamento em um clique – O recurso de treinamento distribuído do SageMaker. O SageMaker fornece bibliotecas de treinamento distribuídas para paralelismo de dados e paralelismo de modelo. As bibliotecas são otimizadas para o ambiente de treinamento do SageMaker, ajudam a adaptar seus trabalhos de treinamento distribuídos ao SageMaker e melhoram a velocidade e o rendimento do treinamento.
    • Experimentos SageMaker – Experimentos é um recurso do SageMaker que permite organizar, rastrear, comparar e avaliar suas iterações de ML.
    • Transformação em lote SageMaker – A transformação em lote ou pontuação offline é um serviço gerenciado no SageMaker que permite prever em um conjunto de dados maior usando seus modelos de ML.
  • Orquestração de fluxo de trabalho

Um pipeline do SageMaker é uma série de etapas interconectadas definidas por uma definição de pipeline JSON. Ele codifica um pipeline usando um gráfico acíclico direcionado (DAG). O DAG fornece informações sobre os requisitos e relacionamentos entre cada etapa do pipeline, e sua estrutura é determinada pelas dependências de dados entre as etapas. Essas dependências são criadas quando as propriedades da saída de uma etapa são passadas como entrada para outra etapa.

O diagrama a seguir ilustra as diferentes etapas no pipeline do SageMaker (para um caso de uso de previsão de rotatividade) em que as conexões entre as etapas são inferidas pelo SageMaker com base nas entradas e saídas definidas pelas definições das etapas.

As próximas seções explicam a criação de cada etapa do pipeline e a execução de todo o pipeline depois de criado.

Novos recursos para Amazon SageMaker Pipelines e Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Estrutura do projeto

Vamos começar com a estrutura do projeto:

  • /sm-pipelines-end-to-end-exemplo – O nome do projeto
    • /dados – Os conjuntos de dados
    • /pipelines – Os arquivos de código para componentes de pipeline
      • /a rotatividade de clientes
        • pré-processo.py
        • avaliar.py
    • sagemaker-pipelines-project.ipynb – Um notebook percorrendo o fluxo de trabalho de modelagem usando os novos recursos do Pipelines

Baixe o conjunto de dados

Para acompanhar este post, você precisa baixar e salvar o amostra de conjunto de dados na pasta de dados dentro do diretório inicial do projeto, que salva o arquivo em Sistema de arquivos elástico da Amazon (Amazon EFS) no ambiente Studio.

Construir os componentes do pipeline

Agora você está pronto para construir os componentes do pipeline.

Importar instruções e declarar parâmetros e constantes

Crie um bloco de anotações do Studio chamado sagemaker-pipelines-project.ipynb dentro do diretório inicial do projeto. Insira o seguinte bloco de código em uma célula e execute a célula para configurar objetos de cliente SageMaker e S3, crie PipelineSessione configure o local do bucket do S3 usando o bucket padrão que acompanha uma sessão do SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines oferece suporte à parametrização, que permite especificar parâmetros de entrada em tempo de execução sem alterar o código do pipeline. Você pode usar os módulos disponíveis no sagemaker.workflow.parameters módulo, como ParameterInteger, ParameterFloat e ParameterString, para especificar parâmetros de pipeline de vários tipos de dados. Execute o seguinte código para configurar vários parâmetros de entrada:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Gerar um conjunto de dados em lote

Gere o conjunto de dados em lote, que você usará posteriormente na etapa de transformação em lote:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Carregar dados para um bucket do S3

Faça upload dos conjuntos de dados para o Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definir um script de processamento e uma etapa de processamento

Nesta etapa, você prepara um script Python para fazer engenharia de recursos, uma codificação a quente e seleciona as divisões de treinamento, validação e teste a serem usadas para a construção do modelo. Execute o seguinte código para criar seu script de processamento:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Em seguida, execute o bloco de código a seguir para instanciar o processador e a etapa Pipelines para executar o script de processamento. Como o script de processamento é escrito em Pandas, você usa um Processador SKLearn. Os oleodutos ProcessingStep A função recebe os seguintes argumentos: o processador, os locais S3 de entrada para conjuntos de dados brutos e os locais S3 de saída para salvar conjuntos de dados processados.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definir uma etapa de treinamento

Configure o treinamento do modelo usando um estimador SageMaker XGBoost e os Pipelines TrainingStep função:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definir o script de avaliação e a etapa de avaliação do modelo

Execute o bloco de código a seguir para avaliar o modelo depois de treinado. Este script encapsula a lógica para verificar se a pontuação AUC atende ao limite especificado.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Em seguida, execute o bloco de código a seguir para instanciar o processador e a etapa Pipelines para executar o script de avaliação. Como o script de avaliação usa o pacote XGBoost, você usa um ScriptProcessor junto com a imagem XGBoost. Os oleodutos ProcessingStep A função recebe os seguintes argumentos: o processador, os locais S3 de entrada para conjuntos de dados brutos e os locais S3 de saída para salvar conjuntos de dados processados.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definir uma etapa de criação de modelo

Execute o bloco de código a seguir para criar um modelo do SageMaker usando a etapa do modelo Pipelines. Esta etapa utiliza a saída da etapa de treinamento para empacotar o modelo para implantação. Observe que o valor do argumento do tipo de instância é passado usando o parâmetro Pipelines que você definiu anteriormente na postagem.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Definir uma etapa de transformação em lote

Execute o seguinte bloco de código para executar a transformação em lote usando o modelo treinado com a entrada em lote criada na primeira etapa:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definir uma etapa do modelo de registro

O código a seguir registra o modelo no registro de modelo do SageMaker usando a etapa do modelo Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definir uma etapa de falha para interromper o pipeline

O código a seguir define a etapa de falha de pipelines para interromper a execução do pipeline com uma mensagem de erro se a pontuação AUC não atender ao limite definido:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Defina uma etapa de condição para verificar a pontuação AUC

O código a seguir define uma etapa de condição para verificar a pontuação AUC e criar condicionalmente um modelo e executar uma transformação em lote e registrar um modelo no registro do modelo ou interromper a execução do pipeline em um estado de falha:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Construir e executar o pipeline

Depois de definir todas as etapas do componente, você pode montá-las em um objeto Pipelines. Você não precisa especificar a ordem do pipeline porque o Pipelines infere automaticamente a sequência do pedido com base nas dependências entre as etapas.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Execute o seguinte código em uma célula em seu notebook. Se o pipeline já existir, o código atualiza o pipeline. Se o pipeline não existir, ele cria um novo.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Conclusão

Neste post, apresentamos alguns dos novos recursos agora disponíveis com o Pipelines, juntamente com outros recursos integrados do SageMaker e o algoritmo XGBoost para desenvolver, iterar e implantar um modelo para previsão de churn. A solução pode ser estendida com fontes de dados adicionais

para implementar seu próprio fluxo de trabalho de ML. Para obter mais detalhes sobre as etapas disponíveis no fluxo de trabalho de pipelines, consulte Pipeline de criação de modelos do Amazon SageMaker e Fluxos de trabalho do SageMaker. O Exemplos do AWS SageMaker O repositório do GitHub tem mais exemplos de vários casos de uso usando Pipelines.


Sobre os autores

Novos recursos para Amazon SageMaker Pipelines e Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Jerry Peng é engenheiro de desenvolvimento de software do AWS SageMaker. Ele se concentra na criação de sistemas MLOps de larga escala de ponta a ponta, desde o treinamento até o monitoramento de modelos na produção. Ele também é apaixonado por trazer o conceito de MLOps para um público mais amplo.

Novos recursos para Amazon SageMaker Pipelines e Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Dewen Qi é engenheiro de desenvolvimento de software na AWS. Atualmente, ela se concentra no desenvolvimento e aprimoramento de Pipelines do SageMaker. Fora do trabalho, ela gosta de praticar violoncelo.

Novos recursos para Amazon SageMaker Pipelines e Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Gayatri Ganakota é engenheiro sênior de aprendizado de máquina com AWS Professional Services. Ela é apaixonada por desenvolver, implantar e explicar soluções de IA/ML em vários domínios. Antes dessa função, ela liderou várias iniciativas como cientista de dados e engenheira de ML com as principais empresas globais no espaço financeiro e de varejo. Ela possui mestrado em Ciência da Computação com especialização em Ciência de Dados pela University of Colorado, Boulder.

Novos recursos para Amazon SageMaker Pipelines e Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Rupinder Grewal é um arquiteto de soluções especialista sênior em Ai/ML da AWS. Ele atualmente se concentra em servir de modelos e MLOps no SageMaker. Antes dessa função, ele trabalhou como engenheiro de aprendizado de máquina construindo e hospedando modelos. Fora do trabalho, ele gosta de jogar tênis e andar de bicicleta em trilhas nas montanhas.

Novos recursos para Amazon SageMaker Pipelines e Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Ray Li é um Cientista de Dados Sênior com AWS Professional Services. Sua especialidade se concentra na criação e operacionalização de soluções de IA/ML para clientes de tamanhos variados, desde startups a organizações empresariais. Fora do trabalho, Ray gosta de fazer exercícios e viajar.

Carimbo de hora:

Mais de Aprendizado de máquina da AWS