Bonnes pratiques et modèles de conception pour la création de flux de travail d'apprentissage automatique avec Amazon SageMaker Pipelines | Services Web Amazon

Bonnes pratiques et modèles de conception pour la création de flux de travail d'apprentissage automatique avec Amazon SageMaker Pipelines | Services Web Amazon

Pipelines Amazon SageMaker est un service AWS entièrement géré pour créer et orchestrer des flux de travail d'apprentissage automatique (ML). SageMaker Pipelines offre aux développeurs d'applications ML la possibilité d'orchestrer différentes étapes du flux de travail ML, notamment le chargement et la transformation des données, la formation, le réglage et le déploiement. Vous pouvez utiliser SageMaker Pipelines pour orchestrer des tâches ML dans SageMaker, et ses intégration avec l'écosystème AWS plus large vous permet également d'utiliser des ressources comme AWS Lambda fonctions, Amazon DME des emplois, et plus encore. Cela vous permet de créer un pipeline personnalisé et reproductible pour des exigences spécifiques dans vos flux de travail ML.

Dans cet article, nous proposons quelques bonnes pratiques pour maximiser la valeur de SageMaker Pipelines et rendre l'expérience de développement transparente. Nous discutons également de certains scénarios et modèles de conception courants lors de la création de pipelines SageMaker et fournissons des exemples pour les résoudre.

Meilleures pratiques pour les pipelines SageMaker

Dans cette section, nous discutons de certaines bonnes pratiques qui peuvent être suivies lors de la conception de flux de travail à l'aide de SageMaker Pipelines. Leur adoption peut améliorer le processus de développement et rationaliser la gestion opérationnelle de SageMaker Pipelines.

Utiliser Pipeline Session pour un chargement paresseux du pipeline

Séance de pipeline permet l'initialisation paresseuse des ressources du pipeline (les tâches ne sont démarrées qu'au moment de l'exécution du pipeline). Le PipelineSession le contexte hérite de Séance SageMaker et implémente des méthodes pratiques pour interagir avec d'autres entités et ressources SageMaker, telles que des tâches de formation, des points de terminaison, des ensembles de données d'entrée dans Service de stockage simple Amazon (Amazon S3), etc. Lors de la définition des pipelines SageMaker, vous devez utiliser PipelineSession au cours de la session SageMaker régulière :

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

Exécutez des pipelines en mode local pour des itérations rentables et rapides pendant le développement

Vous pouvez exécuter un pipeline en mode local utilisant l' LocalPipelineSession contexte. Dans ce mode, le pipeline et les tâches sont exécutés localement en utilisant les ressources de la machine locale, au lieu des ressources gérées par SageMaker. Le mode local offre un moyen rentable d’itérer sur le code du pipeline avec un sous-ensemble de données plus petit. Une fois le pipeline testé localement, il peut être mis à l'échelle pour s'exécuter à l'aide du Session Pipeline contexte

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

Gérer un pipeline SageMaker via le contrôle de version

La gestion des versions des artefacts et des définitions de pipeline est une exigence courante dans le cycle de vie de développement. Vous pouvez créer plusieurs versions du pipeline en nommant les objets du pipeline avec un préfixe ou un suffixe unique, le plus courant étant un horodatage, comme indiqué dans le code suivant :

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

Organisez et suivez les exécutions du pipeline SageMaker en intégrant SageMaker Experiments

SageMaker Pipelines peut être facilement intégré à Expériences SageMaker pour organiser et suivi des exécutions de pipelines. Ceci est réalisé en spécifiant PipelineExperimentConfig au moment de créer un objet pipeline. Avec cet objet de configuration, vous pouvez spécifier un nom d'expérience et un nom d'essai. Les détails d'exécution d'un pipeline SageMaker sont organisés selon l'expérience et l'essai spécifiés. Si vous ne spécifiez pas explicitement un nom d'expérience, un nom de pipeline est utilisé pour le nom de l'expérience. De même, si vous ne spécifiez pas explicitement un nom d'essai, un ID d'exécution de pipeline est utilisé pour l'essai ou le nom du groupe d'exécution. Voir le code suivant :

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

Exécutez en toute sécurité des pipelines SageMaker dans un VPC privé

Pour sécuriser les charges de travail ML, il est recommandé de déployer les tâches orchestrées par SageMaker Pipelines dans une configuration réseau sécurisée au sein d'un VPC privé, de sous-réseaux privés et de groupes de sécurité. Pour garantir et imposer l'utilisation de cet environnement sécurisé, vous pouvez mettre en œuvre les éléments suivants Gestion des identités et des accès AWS (IAM) pour le Rôle d'exécution de SageMaker (c'est le rôle assumé par le pipeline lors de son exécution). Vous pouvez également ajouter la stratégie pour exécuter les tâches orchestrées par SageMaker Pipelines en mode d'isolation réseau.

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

Pour un exemple de mise en œuvre d'un pipeline avec ces contrôles de sécurité en place, reportez-vous à Orchestration des tâches, de l'enregistrement des modèles et du déploiement continu avec Amazon SageMaker dans un environnement sécurisé.

Surveiller le coût des exécutions de pipelines à l'aide de balises

L'utilisation des pipelines SageMaker en elle-même est gratuite ; vous payez pour les ressources de calcul et de stockage que vous activez dans le cadre des étapes individuelles du pipeline telles que le traitement, la formation et l'inférence par lots. Pour regrouper les coûts par exécution de pipeline, vous pouvez inclure étiquettes à chaque étape du pipeline qui crée une ressource. Ces balises peuvent ensuite être référencées dans l'explorateur de coûts pour filtrer et agréger le coût total d'exécution du pipeline, comme le montre l'exemple suivant :

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

Depuis l'explorateur de coûts, vous pouvez désormais obtenir le coût filtré par la balise :

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

Modèles de conception pour certains scénarios courants

Dans cette section, nous discutons des modèles de conception pour certains cas d'utilisation courants avec SageMaker Pipelines.

Exécutez une fonction Python légère à l'aide d'une étape Lambda

Les fonctions Python sont omniprésentes dans les workflows ML ; ils sont utilisés dans le prétraitement, le post-traitement, l'évaluation, etc. Lambda est un service de calcul sans serveur qui vous permet d'exécuter du code sans provisionner ni gérer de serveurs. Avec Lambda, vous pouvez exécuter du code dans votre langage préféré qui inclut Python. Vous pouvez l'utiliser pour exécuter du code Python personnalisé dans le cadre de votre pipeline. Une étape Lambda vous permet d'exécuter des fonctions Lambda dans le cadre de votre pipeline SageMaker. Commencez par le code suivant :

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

Créez la fonction Lambda à l'aide de Assistant Lambda du SDK SageMaker Python:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Appelez l'étape Lambda :

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

Transmettre des données entre les étapes

Les données d'entrée pour une étape du pipeline sont soit un emplacement de données accessible, soit des données générées par l'une des étapes précédentes du pipeline. Vous pouvez fournir ces informations comme ProcessingInput paramètre. Examinons quelques scénarios sur la façon dont vous pouvez utiliser ProcessingInput.

Scénario 1 : transmettre la sortie (types de données primitifs) d'une étape Lambda à une étape de traitement

Les types de données primitifs font référence aux types de données scalaires tels que chaîne, entier, booléen et flottant.

L'extrait de code suivant définit une fonction Lambda qui renvoie un dictionnaire de variables avec des types de données primitifs. Le code de votre fonction Lambda renverra un JSON de paires clé-valeur lorsqu'il sera invoqué à partir de l'étape Lambda dans le pipeline SageMaker.

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

Dans la définition du pipeline, vous pouvez ensuite définir les paramètres du pipeline SageMaker qui sont d'un type de données spécifique et définir la variable sur la sortie de la fonction Lambda :

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

Scénario 2 : transmettre la sortie (types de données non primitifs) d'une étape Lambda à une étape de traitement

Les types de données non primitifs font référence à des types de données non scalaires (par exemple, NamedTuple). Vous pouvez avoir un scénario dans lequel vous devez renvoyer un type de données non primitif à partir d'une fonction Lambda. Pour ce faire, vous devez convertir votre type de données non primitif en chaîne :

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Vous pouvez ensuite utiliser cette chaîne comme entrée pour une étape ultérieure du pipeline. Pour utiliser le tuple nommé dans le code, utilisez eval() pour analyser l'expression Python dans la chaîne :

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

Scénario 3 : transmettre la sortie d'une étape via un fichier de propriétés

Vous pouvez également stocker le résultat d'une étape de traitement dans un fichier JSON de propriété pour la consommation en aval dans un ConditionStep ou un autre ProcessingStep. Vous pouvez utiliser le Fonction JSONGet pour interroger un fichier de propriété. Voir le code suivant:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

Supposons que le contenu du fichier de propriétés soit le suivant :

{ "hyperparam": { "eta": { "value": 0.6 } }
}

Dans ce cas, il peut être interrogé pour une valeur spécifique et utilisé dans les étapes suivantes à l'aide de la fonction JsonGet :

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

Paramétrer une variable dans la définition du pipeline

Paramétrer les variables afin qu'elles puissent être utilisées au moment de l'exécution est souvent souhaitable, par exemple pour construire un URI S3. Vous pouvez paramétrer une chaîne de telle sorte qu'elle soit évaluée au moment de l'exécution à l'aide de l'option Join fonction. L'extrait de code suivant montre comment définir la variable à l'aide du Join fonction et utilisez-la pour définir l'emplacement de sortie dans une étape de traitement :

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

Exécuter du code parallèle sur un itérable

Certains workflows ML exécutent du code en boucles for parallèles sur un ensemble statique d'éléments (un itérable). Il peut s'agir soit du même code exécuté sur des données différentes, soit d'un morceau de code différent qui doit être exécuté pour chaque élément. Par exemple, si vous avez un très grand nombre de lignes dans un fichier et que vous souhaitez accélérer le temps de traitement, vous pouvez vous appuyer sur l'ancien modèle. Si vous souhaitez effectuer différentes transformations sur des sous-groupes spécifiques des données, vous devrez peut-être exécuter un morceau de code différent pour chaque sous-groupe des données. Les deux scénarios suivants illustrent comment concevoir des pipelines SageMaker à cet effet.

Scénario 1 : Implémenter une logique de traitement sur différentes portions de données

Vous pouvez exécuter une tâche de traitement avec plusieurs instances (en définissant instance_count à une valeur supérieure à 1). Cela distribue les données d'entrée d'Amazon S3 dans toutes les instances de traitement. Vous pouvez ensuite utiliser un script (process.py) pour travailler sur une partie spécifique des données en fonction du numéro d'instance et de l'élément correspondant dans la liste des éléments. La logique de programmation dans process.py peut être écrite de telle sorte qu'un module ou un morceau de code différent soit exécuté en fonction de la liste des éléments qu'il traite. L'exemple suivant définit un processeur qui peut être utilisé dans un ProcessingStep :

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

Scénario 2 : Exécuter une séquence d'étapes

Lorsque vous disposez d'une séquence d'étapes qui doivent être exécutées en parallèle, vous pouvez définir chaque séquence comme un pipeline SageMaker indépendant. L'exécution de ces pipelines SageMaker peut ensuite être déclenchée à partir d'une fonction Lambda faisant partie d'un LambdaStep dans le pipeline parent. Le morceau de code suivant illustre le scénario dans lequel deux exécutions de pipeline SageMaker différentes sont déclenchées :

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

Conclusion

Dans cet article, nous avons discuté de quelques bonnes pratiques pour une utilisation et une maintenance efficaces des pipelines SageMaker. Nous avons également fourni certains modèles que vous pouvez adopter lors de la conception de flux de travail avec SageMaker Pipelines, que vous créiez de nouveaux pipelines ou migrez des flux de travail ML à partir d'autres outils d'orchestration. Pour démarrer avec l'orchestration de flux de travail SageMaker Pipelines for ML, reportez-vous au exemples de code sur GitHub et de Pipelines de création de modèles Amazon SageMaker.


À propos des auteurs

Bonnes pratiques et modèles de conception pour la création de flux de travail d'apprentissage automatique avec Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Pinak Panigrahi travaille avec les clients pour créer des solutions basées sur l'apprentissage automatique afin de résoudre les problèmes commerciaux stratégiques sur AWS. Lorsqu’il n’est pas occupé par l’apprentissage automatique, on peut le trouver en train de faire une randonnée, de lire un livre ou de regarder du sport.

Bonnes pratiques et modèles de conception pour la création de flux de travail d'apprentissage automatique avec Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Meenakshisundaram Thandavarayan travaille pour AWS en tant que spécialiste IA/ML. Il est passionné par la conception, la création et la promotion d'expériences de données et d'analyse centrées sur l'humain. Meena se concentre sur le développement de systèmes durables qui offrent des avantages compétitifs mesurables aux clients stratégiques d'AWS. Meena est un connecteur, un penseur du design et s'efforce de conduire les entreprises vers de nouvelles façons de travailler grâce à l'innovation, l'incubation et la démocratisation.

Horodatage:

Plus de Apprentissage automatique AWS