Melhores práticas e padrões de design para criar fluxos de trabalho de machine learning com Amazon SageMaker Pipelines | Amazon Web Services

Melhores práticas e padrões de design para criar fluxos de trabalho de machine learning com Amazon SageMaker Pipelines | Amazon Web Services

Pipelines Amazon SageMaker é um serviço AWS totalmente gerenciado para criar e orquestrar fluxos de trabalho de aprendizado de máquina (ML). O SageMaker Pipelines oferece aos desenvolvedores de aplicativos de ML a capacidade de orquestrar diferentes etapas do fluxo de trabalho de ML, incluindo carregamento de dados, transformação de dados, treinamento, ajuste e implantação. Você pode usar o SageMaker Pipelines para orquestrar trabalhos de ML no SageMaker e seus integração com o ecossistema maior da AWS também permite que você use recursos como AWS Lambda funções, Amazon EMR empregos e muito mais. Isso permite que você crie um pipeline personalizado e reproduzível para requisitos específicos em seus fluxos de trabalho de ML.

Nesta postagem, fornecemos algumas práticas recomendadas para maximizar o valor do SageMaker Pipelines e tornar a experiência de desenvolvimento perfeita. Também discutimos alguns cenários e padrões de design comuns ao construir pipelines do SageMaker e fornecemos exemplos para abordá-los.

Melhores práticas para pipelines SageMaker

Nesta seção, discutimos algumas práticas recomendadas que podem ser seguidas ao projetar fluxos de trabalho usando SageMaker Pipelines. Adotá-los pode melhorar o processo de desenvolvimento e agilizar o gerenciamento operacional do SageMaker Pipelines.

Use a sessão do pipeline para carregamento lento do pipeline

Sessão de pipeline permite a inicialização lenta de recursos de pipeline (os trabalhos não são iniciados até o tempo de execução do pipeline). O PipelineSession contexto herda o Sessão SageMaker e implementa métodos convenientes para interagir com outras entidades e recursos do SageMaker, como trabalhos de treinamento, endpoints, conjuntos de dados de entrada em Serviço de armazenamento simples da Amazon (Amazon S3) e assim por diante. Ao definir SageMaker Pipelines, você deve usar PipelineSession durante a sessão normal do SageMaker:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

Execute pipelines no modo local para iterações rápidas e econômicas durante o desenvolvimento

Você pode executar um pipeline em modo local usando o LocalPipelineSession contexto. Nesse modo, o pipeline e os trabalhos são executados localmente usando recursos na máquina local, em vez de recursos gerenciados pelo SageMaker. O modo local fornece uma maneira econômica de iterar o código do pipeline com um subconjunto menor de dados. Depois que o pipeline for testado localmente, ele poderá ser dimensionado para execução usando o Sessão de Pipeline contexto.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

Gerencie um pipeline do SageMaker por meio de controle de versão

O versionamento de artefatos e definições de pipeline é um requisito comum no ciclo de vida de desenvolvimento. Você pode criar várias versões do pipeline nomeando objetos de pipeline com um prefixo ou sufixo exclusivo, sendo o mais comum um carimbo de data/hora, conforme mostrado no código a seguir:

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

Organize e rastreie execuções de pipeline do SageMaker integrando-o com SageMaker Experiments

O SageMaker Pipelines pode ser facilmente integrado com Experimentos SageMaker para organização e rastreamento de execuções de pipeline. Isto é conseguido especificando PipelineExperimentConfig na hora de criar um objeto de pipeline. Com este objeto de configuração, você pode especificar um nome de experimento e um nome de avaliação. Os detalhes de execução de um pipeline SageMaker são organizados de acordo com o experimento e teste especificados. Se você não especificar explicitamente um nome de experimento, um nome de pipeline será usado para o nome do experimento. Da mesma forma, se você não especificar explicitamente um nome de avaliação, um ID de execução de pipeline será usado para o nome do grupo de avaliação ou execução. Veja o seguinte código:

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

Execute pipelines do SageMaker com segurança em uma VPC privada

Para proteger as cargas de trabalho de ML, é uma prática recomendada implantar os trabalhos orquestrados pelo SageMaker Pipelines em uma configuração de rede segura dentro de uma VPC privada, sub-redes privadas e grupos de segurança. Para garantir e impor o uso deste ambiente seguro, você pode implementar o seguinte Gerenciamento de acesso e identidade da AWS (IAM) para o Função de execução do SageMaker (este é o papel assumido pelo pipeline durante sua execução). Você também pode adicionar a política para executar os trabalhos orquestrados pelo SageMaker Pipelines no modo de isolamento de rede.

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

Para obter um exemplo de implementação de pipeline com esses controles de segurança em vigor, consulte Orquestrando trabalhos, registro de modelo e implantação contínua com o Amazon SageMaker em um ambiente seguro.

Monitore o custo das execuções do pipeline usando tags

O uso de pipelines SageMaker por si só é gratuito; você paga pelos recursos de computação e armazenamento gerados como parte das etapas individuais do pipeline, como processamento, treinamento e inferência em lote. Para agregar os custos por execução de pipeline, você pode incluir Tag em cada etapa do pipeline que cria um recurso. Essas tags podem então ser referenciadas no explorador de custos para filtrar e agregar o custo total de execução do pipeline, conforme mostrado no exemplo a seguir:

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

No explorador de custos, agora você pode filtrar o custo pela tag:

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

Padrões de design para alguns cenários comuns

Nesta seção, discutimos padrões de design para alguns casos de uso comuns do SageMaker Pipelines.

Execute uma função Python leve usando uma etapa do Lambda

As funções Python são onipresentes nos fluxos de trabalho de ML; eles são usados ​​em pré-processamento, pós-processamento, avaliação e muito mais. Lambda é um serviço de computação sem servidor que permite executar código sem provisionar ou gerenciar servidores. Com o Lambda, você pode executar código na linguagem de sua preferência que inclui Python. Você pode usar isso para executar código Python personalizado como parte do seu pipeline. Uma etapa Lambda permite que você execute funções do Lambda como parte do pipeline do SageMaker. Comece com o seguinte código:

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

Crie a função do Lambda usando o Auxiliar Lambda do SageMaker Python SDK:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Chame a etapa Lambda:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

Passe dados entre etapas

Os dados de entrada para uma etapa do pipeline são um local de dados acessível ou dados gerados por uma das etapas anteriores do pipeline. Você pode fornecer essas informações como um ProcessingInput parâmetro. Vejamos alguns cenários de como você pode usar ProcessingInput.

Cenário 1: passar a saída (tipos de dados primitivos) de uma etapa do Lambda para uma etapa de processamento

Os tipos de dados primitivos referem-se a tipos de dados escalares como string, inteiro, booleano e flutuante.

O trecho de código a seguir define uma função Lambda que retorna um dicionário de variáveis ​​com tipos de dados primitivos. Seu código de função do Lambda retornará um JSON de pares de valores-chave quando invocado na etapa do Lambda no pipeline do SageMaker.

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

Na definição do pipeline, você pode definir parâmetros de pipeline do SageMaker que são de um tipo de dados específico e definir a variável para a saída da função Lambda:

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

Cenário 2: passar a saída (tipos de dados não primitivos) de uma etapa do Lambda para uma etapa de processamento

Tipos de dados não primitivos referem-se a tipos de dados não escalares (por exemplo, NamedTuple). Você pode ter um cenário em que precisa retornar um tipo de dados não primitivo de uma função Lambda. Para fazer isso, você deve converter seu tipo de dados não primitivo em uma string:

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Em seguida, você pode usar essa string como entrada para uma etapa subsequente no pipeline. Para usar a tupla nomeada no código, use eval() para analisar a expressão Python na string:

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

Cenário 3: Passar a saída de uma etapa por meio de um arquivo de propriedades

Também é possível armazenar a saída de uma etapa de processamento em um arquivo JSON de propriedade para consumo a jusante num ConditionStep ou outro ProcessingStep. Você pode usar o Função JSONGet para consultar um arquivo de propriedade. Veja o seguinte código:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

Vamos supor que o conteúdo do arquivo de propriedades seja o seguinte:

{ "hyperparam": { "eta": { "value": 0.6 } }
}

Nesse caso, ele pode ser consultado por um valor específico e usado nas etapas subsequentes usando a função JsonGet:

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

Parametrizar uma variável na definição do pipeline

Muitas vezes é desejável parametrizar variáveis ​​para que possam ser usadas em tempo de execução — por exemplo, para construir um URI S3. Você pode parametrizar uma string de forma que ela seja avaliada em tempo de execução usando o Join função. O trecho de código a seguir mostra como definir a variável usando o Join função e use-a para definir o local de saída em uma etapa de processamento:

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

Execute código paralelo em um iterável

Alguns fluxos de trabalho de ML executam código em loops for paralelos em um conjunto estático de itens (um iterável). Pode ser o mesmo código executado em dados diferentes ou um trecho de código diferente que precisa ser executado para cada item. Por exemplo, se você tiver um número muito grande de linhas em um arquivo e quiser acelerar o tempo de processamento, poderá confiar no padrão anterior. Se quiser realizar transformações diferentes em subgrupos específicos nos dados, talvez seja necessário executar um trecho de código diferente para cada subgrupo nos dados. Os dois cenários a seguir ilustram como você pode projetar pipelines do SageMaker para essa finalidade.

Cenário 1: Implementar uma lógica de processamento em diferentes partes dos dados

Você pode executar um trabalho de processamento com múltiplas instâncias (definindo instance_count para um valor maior que 1). Isso distribui os dados de entrada do Amazon S3 em todas as instâncias de processamento. Você pode então usar um script (process.py) para trabalhar em uma parte específica dos dados com base no número da instância e no elemento correspondente na lista de itens. A lógica de programação em process.py pode ser escrita de forma que um módulo ou trecho de código diferente seja executado dependendo da lista de itens que ele processa. O exemplo a seguir define um processador que pode ser usado em um ProcessingStep:

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

Cenário 2: Execute uma sequência de etapas

Quando você tem uma sequência de etapas que precisa ser executada em paralelo, pode definir cada sequência como um pipeline independente do SageMaker. A execução desses pipelines do SageMaker pode então ser acionada a partir de uma função Lambda que faz parte de um LambdaStep no pipeline pai. O trecho de código a seguir ilustra o cenário em que duas execuções de pipeline diferentes do SageMaker são acionadas:

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

Conclusão

Nesta postagem, discutimos algumas práticas recomendadas para o uso e manutenção eficiente de pipelines SageMaker. Também fornecemos certos padrões que você pode adotar ao projetar fluxos de trabalho com SageMaker Pipelines, esteja você criando novos pipelines ou migrando fluxos de trabalho de ML de outras ferramentas de orquestração. Para começar a usar o SageMaker Pipelines para orquestração de fluxo de trabalho de ML, consulte o amostras de código no GitHub e Pipelines de criação de modelos do Amazon SageMaker.


Sobre os autores

Melhores práticas e padrões de design para criar fluxos de trabalho de machine learning com Amazon SageMaker Pipelines | Inteligência de dados PlatoBlockchain da Amazon Web Services. Pesquisa vertical. Ai.Pinak Panigrahi trabalha com clientes para criar soluções orientadas por aprendizado de máquina para resolver problemas estratégicos de negócios na AWS. Quando não está ocupado com aprendizado de máquina, ele pode ser encontrado fazendo caminhadas, lendo um livro ou assistindo esportes.

Melhores práticas e padrões de design para criar fluxos de trabalho de machine learning com Amazon SageMaker Pipelines | Inteligência de dados PlatoBlockchain da Amazon Web Services. Pesquisa vertical. Ai.Meenakshisundaram Thandavarayan trabalha para a AWS como um AI/ML Specialist. Ele tem paixão por projetar, criar e promover dados centrados no ser humano e experiências analíticas. A Meena se concentra no desenvolvimento de sistemas sustentáveis ​​que oferecem vantagens competitivas mensuráveis ​​para clientes estratégicos da AWS. Meena é uma conectora, pensadora de design e se esforça para conduzir os negócios a novas formas de trabalhar por meio da inovação, incubação e democratização.

Carimbo de hora:

Mais de Aprendizado de máquina da AWS