Best practice e modelli di progettazione per la creazione di flussi di lavoro di machine learning con Amazon SageMaker Pipelines | Servizi Web di Amazon

Best practice e modelli di progettazione per la creazione di flussi di lavoro di machine learning con Amazon SageMaker Pipelines | Servizi Web di Amazon

Pipeline di Amazon SageMaker è un servizio AWS completamente gestito per la creazione e l'orchestrazione di flussi di lavoro di machine learning (ML). SageMaker Pipelines offre agli sviluppatori di applicazioni ML la possibilità di orchestrare diverse fasi del flusso di lavoro ML, tra cui caricamento dei dati, trasformazione dei dati, formazione, ottimizzazione e distribuzione. Puoi utilizzare SageMaker Pipelines per orchestrare lavori ML in SageMaker e suoi integrazione con l'ecosistema AWS più ampio ti consente anche di utilizzare risorse come AWS Lambda Amazon EMR posti di lavoro e altro ancora. Ciò ti consente di creare una pipeline personalizzata e riproducibile per requisiti specifici nei tuoi flussi di lavoro ML.

In questo post forniamo alcune best practice per massimizzare il valore di SageMaker Pipelines e rendere l'esperienza di sviluppo fluida. Discuteremo anche alcuni scenari e modelli di progettazione comuni durante la creazione di SageMaker Pipelines e forniremo esempi per affrontarli.

Migliori pratiche per le pipeline SageMaker

In questa sezione, discutiamo alcune best practice che possono essere seguite durante la progettazione dei flussi di lavoro utilizzando SageMaker Pipelines. La loro adozione può migliorare il processo di sviluppo e semplificare la gestione operativa di SageMaker Pipelines.

Utilizzare la sessione pipeline per il caricamento lento della pipeline

Sessione della pipeline abilita l'inizializzazione lenta delle risorse della pipeline (i lavori non vengono avviati fino al runtime della pipeline). IL PipelineSession contesto eredita il Sessione SageMaker e implementa metodi convenienti per interagire con altre entità e risorse SageMaker, come lavori di formazione, endpoint, set di dati di input in Servizio di archiviazione semplice Amazon (Amazon S3) e così via. Quando definisci le pipeline SageMaker, dovresti utilizzare PipelineSession durante la normale sessione SageMaker:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

Esegui pipeline in modalità locale per iterazioni rapide e convenienti durante lo sviluppo

Puoi eseguire un file pipeline in modalità locale usando il LocalPipelineSession contesto. In questa modalità, la pipeline e i lavori vengono eseguiti localmente utilizzando le risorse sul computer locale, anziché le risorse gestite da SageMaker. La modalità locale fornisce un modo conveniente per eseguire l'iterazione del codice della pipeline con un sottoinsieme di dati più piccolo. Dopo che la pipeline è stata testata localmente, può essere ridimensionata per l'esecuzione utilizzando il file Pipeline Session contesto.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

Gestisci una pipeline SageMaker tramite il controllo delle versioni

Il controllo delle versioni degli artefatti e delle definizioni della pipeline è un requisito comune nel ciclo di vita dello sviluppo. È possibile creare più versioni della pipeline denominando gli oggetti pipeline con un prefisso o suffisso univoco, il più comune dei quali è un timestamp, come mostrato nel codice seguente:

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

Organizza e monitora le esecuzioni della pipeline SageMaker integrandosi con SageMaker Experiments

SageMaker Pipelines può essere facilmente integrato con Esperimenti di SageMaker per organizzare e monitoraggio delle corse della pipeline. Ciò si ottiene specificando PipelineExperimentConfig al momento della creazione di a oggetto pipeline. Con questo oggetto di configurazione puoi specificare un nome per l'esperimento e un nome per la prova. I dettagli dell'esecuzione di una pipeline SageMaker vengono organizzati in base all'esperimento e alla prova specificati. Se non specifichi esplicitamente il nome dell'esperimento, per il nome dell'esperimento viene utilizzato il nome di una pipeline. Allo stesso modo, se non si specifica esplicitamente un nome di prova, viene utilizzato un ID di esecuzione della pipeline per il nome della prova o del gruppo di esecuzione. Vedere il seguente codice:

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

Esegui in modo sicuro le pipeline SageMaker all'interno di un VPC privato

Per proteggere i carichi di lavoro ML, è consigliabile distribuire i lavori orchestrati da SageMaker Pipelines in una configurazione di rete sicura all'interno di un VPC privato, sottoreti private e gruppi di sicurezza. Per garantire e imporre l'utilizzo di questo ambiente sicuro, è possibile implementare quanto segue Gestione dell'identità e dell'accesso di AWS (IAM) politica per il Ruolo di esecuzione di SageMaker (questo è il ruolo assunto dalla pipeline durante la sua corsa). Puoi anche aggiungere la policy per eseguire i lavori orchestrati da SageMaker Pipelines in modalità di isolamento della rete.

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

Per un esempio di implementazione della pipeline con questi controlli di sicurezza in atto, fare riferimento a Orchestrazione di lavori, registrazione di modelli e distribuzione continua con Amazon SageMaker in un ambiente sicuro.

Monitora il costo delle esecuzioni della pipeline utilizzando i tag

L'utilizzo delle pipeline SageMaker da solo è gratuito; paghi per le risorse di elaborazione e archiviazione che avvii come parte delle singole fasi della pipeline come elaborazione, formazione e inferenza batch. Per aggregare i costi per esecuzione della pipeline, puoi includere tag in ogni passaggio della pipeline che crea una risorsa. È quindi possibile fare riferimento a questi tag nell'Explorer costi per filtrare e aggregare il costo totale di esecuzione della pipeline, come mostrato nell'esempio seguente:

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

Dall'esploratore dei costi, ora puoi ottenere il costo filtrato dal tag:

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

Modelli di progettazione per alcuni scenari comuni

In questa sezione discuteremo i modelli di progettazione per alcuni casi d'uso comuni con SageMaker Pipelines.

Esegui una funzione Python leggera utilizzando un passaggio Lambda

Le funzioni Python sono onnipresenti nei flussi di lavoro ML; vengono utilizzati nella preelaborazione, nella postelaborazione, nella valutazione e altro ancora. Lambda è un servizio di elaborazione serverless che ti consente di eseguire codice senza effettuare il provisioning o gestire i server. Con Lambda puoi eseguire il codice nel tuo linguaggio preferito che include Python. Puoi usarlo per eseguire codice Python personalizzato come parte della tua pipeline. Un passo Lambda ti consente di eseguire funzioni Lambda come parte della pipeline SageMaker. Inizia con il seguente codice:

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

Crea la funzione Lambda usando il Helper Lambda di SageMaker Python SDK:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Richiama il passaggio Lambda:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

Passare i dati tra i passaggi

I dati di input per un passaggio della pipeline sono una posizione dati accessibile o dati generati da uno dei passaggi precedenti della pipeline. È possibile fornire queste informazioni come a ProcessingInput parametro. Diamo un'occhiata ad alcuni scenari su come utilizzare ProcessingInput.

Scenario 1: passa l'output (tipi di dati primitivi) di una fase Lambda a una fase di elaborazione

I tipi di dati primitivi si riferiscono a tipi di dati scalari come stringa, intero, booleano e float.

Il seguente frammento di codice definisce una funzione Lambda che restituisce un dizionario di variabili con tipi di dati primitivi. Il codice della funzione Lambda restituirà un JSON di coppie chiave-valore quando richiamato dalla fase Lambda all'interno della pipeline SageMaker.

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

Nella definizione della pipeline, puoi quindi definire i parametri della pipeline SageMaker che appartengono a un tipo di dati specifico e impostare la variabile sull'output della funzione Lambda:

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

Scenario 2: passa l'output (tipi di dati non primitivi) di una fase Lambda a una fase di elaborazione

I tipi di dati non primitivi si riferiscono a tipi di dati non scalari (ad esempio, NamedTuple). Potresti avere uno scenario in cui devi restituire un tipo di dati non primitivo da una funzione Lambda. Per fare ciò, devi convertire il tuo tipo di dati non primitivo in una stringa:

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Quindi puoi utilizzare questa stringa come input per un passaggio successivo nella pipeline. Per utilizzare la tupla denominata nel codice, utilizzare eval() per analizzare l'espressione Python nella stringa:

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

Scenario 3: passare l'output di un passaggio attraverso un file delle proprietà

È inoltre possibile memorizzare l'output di una fase di elaborazione in un file file JSON delle proprietà per il consumo a valle in a ConditionStep oppure un'altra ProcessingStep. Puoi usare il Funzione JSONGet interrogare a file delle proprietà. Vedi il seguente codice:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

Supponiamo che il contenuto del file delle proprietà sia il seguente:

{ "hyperparam": { "eta": { "value": 0.6 } }
}

In questo caso, è possibile interrogarlo per un valore specifico e utilizzarlo nei passaggi successivi utilizzando la funzione JsonGet:

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

Parametrizzare una variabile nella definizione della pipeline

La parametrizzazione delle variabili in modo che possano essere utilizzate in fase di esecuzione è spesso auspicabile, ad esempio per costruire un URI S3. È possibile parametrizzare una stringa in modo tale che venga valutata in fase di esecuzione utilizzando il file Join funzione. Il seguente frammento di codice mostra come definire la variabile utilizzando il file Join funzione e utilizzarla per impostare la posizione di output in una fase di elaborazione:

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

Esegui codice parallelo su un iterabile

Alcuni flussi di lavoro ML eseguono codice in cicli for paralleli su un insieme statico di elementi (an iterabile). Può trattarsi dello stesso codice che viene eseguito su dati diversi o di una parte di codice diversa che deve essere eseguita per ciascun elemento. Ad esempio, se hai un numero molto elevato di righe in un file e desideri accelerare il tempo di elaborazione, puoi fare affidamento sul modello precedente. Se desideri eseguire trasformazioni diverse su sottogruppi specifici nei dati, potresti dover eseguire una parte di codice diversa per ogni sottogruppo nei dati. I due scenari seguenti illustrano come progettare pipeline SageMaker per questo scopo.

Scenario 1: implementare una logica di elaborazione su diverse porzioni di dati

È possibile eseguire un processo di elaborazione con più istanze (impostando instance_count ad un valore maggiore di 1). Questo distribuisce i dati di input da Amazon S3 in tutte le istanze di elaborazione. È quindi possibile utilizzare uno script (process.py) per lavorare su una porzione specifica dei dati in base al numero di istanza e all'elemento corrispondente nell'elenco di elementi. La logica di programmazione in process.py può essere scritta in modo tale che venga eseguito un modulo o un pezzo di codice diverso a seconda dell'elenco di elementi che elabora. L'esempio seguente definisce un processore che può essere utilizzato in un ProcessingStep:

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

Scenario 2: eseguire una sequenza di passaggi

Quando disponi di una sequenza di passaggi che devono essere eseguiti in parallelo, puoi definire ciascuna sequenza come una pipeline SageMaker indipendente. L'esecuzione di queste pipeline SageMaker può quindi essere attivata da una funzione Lambda che fa parte di a LambdaStep nella pipeline madre. La seguente parte di codice illustra lo scenario in cui vengono attivate due diverse esecuzioni della pipeline SageMaker:

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

Conclusione

In questo post abbiamo discusso alcune best practice per l'uso e la manutenzione efficienti delle pipeline SageMaker. Abbiamo inoltre fornito alcuni modelli che puoi adottare durante la progettazione di flussi di lavoro con SageMaker Pipelines, sia che tu stia creando nuove pipeline o eseguendo la migrazione di flussi di lavoro ML da altri strumenti di orchestrazione. Per iniziare con SageMaker Pipelines per l'orchestrazione del flusso di lavoro ML, fare riferimento a esempi di codice su GitHub ed Condutture per la costruzione di modelli Amazon SageMaker.


Informazioni sugli autori

Best practice e modelli di progettazione per la creazione di flussi di lavoro di machine learning con Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Pinak Panigrahi collabora con i clienti per creare soluzioni basate sull'apprendimento automatico per risolvere problemi aziendali strategici su AWS. Quando non è occupato con l'apprendimento automatico, lo si può trovare mentre fa un'escursione, legge un libro o guarda uno sport.

Best practice e modelli di progettazione per la creazione di flussi di lavoro di machine learning con Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Meenakshisundaram Tandavarayan lavora per AWS come specialista AI/ML. Ha una passione per progettare, creare e promuovere esperienze di analisi e dati incentrate sull'uomo. Meena si concentra sullo sviluppo di sistemi sostenibili che offrano vantaggi misurabili e competitivi per i clienti strategici di AWS. Meena è un connettore, un pensatore di design e si impegna a guidare il business verso nuovi modi di lavorare attraverso l'innovazione, l'incubazione e la democratizzazione.

Timestamp:

Di più da Apprendimento automatico di AWS