Cele mai bune practici și modele de proiectare pentru crearea fluxurilor de lucru de învățare automată cu Amazon SageMaker Pipelines | Amazon Web Services

Cele mai bune practici și modele de proiectare pentru crearea fluxurilor de lucru de învățare automată cu Amazon SageMaker Pipelines | Amazon Web Services

Pipelines Amazon SageMaker este un serviciu AWS complet gestionat pentru construirea și orchestrarea fluxurilor de lucru de învățare automată (ML). SageMaker Pipelines oferă dezvoltatorilor de aplicații ML capacitatea de a orchestra diferiți pași ai fluxului de lucru ML, inclusiv încărcarea datelor, transformarea datelor, antrenament, reglare și implementare. Puteți utiliza SageMaker Pipelines pentru a orchestra lucrări ML în SageMaker și în acesta integrarea cu ecosistemul AWS mai mare de asemenea, vă permite să utilizați resurse precum AWS Lambdas funcții, Amazon EMR locuri de muncă și multe altele. Acest lucru vă permite să construiți o conductă personalizată și reproductibilă pentru cerințe specifice în fluxurile dvs. de lucru ML.

În această postare, oferim câteva bune practici pentru a maximiza valoarea SageMaker Pipelines și pentru a face experiența de dezvoltare fără probleme. De asemenea, discutăm câteva scenarii și modele comune de proiectare atunci când construim SageMaker Pipelines și oferim exemple pentru abordarea acestora.

Cele mai bune practici pentru SageMaker Pipelines

În această secțiune, discutăm câteva dintre cele mai bune practici care pot fi urmate la proiectarea fluxurilor de lucru folosind SageMaker Pipelines. Adoptarea acestora poate îmbunătăți procesul de dezvoltare și eficientiza managementul operațional al SageMaker Pipelines.

Utilizați Pipeline Session pentru încărcarea leneșă a conductei

Sesiunea Pipeline permite inițializarea lenenă a resurselor pipeline (lucrările nu sunt pornite până la runtime pipeline). The PipelineSession contextul moștenește Sesiune SageMaker și implementează metode convenabile pentru interacțiunea cu alte entități și resurse SageMaker, cum ar fi joburi de instruire, puncte finale, seturi de date de intrare în Serviciul Amazon de stocare simplă (Amazon S3) și așa mai departe. Când definiți SageMaker Pipelines, ar trebui să utilizați PipelineSession peste sesiunea obișnuită SageMaker:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

Rulați conducte în modul local pentru iterații rapide și rentabile în timpul dezvoltării

Puteți rula un conductă în modul local folosind LocalPipelineSession context. În acest mod, conducta și joburile sunt rulate local folosind resurse de pe mașina locală, în loc de resursele gestionate de SageMaker. Modul local oferă o modalitate rentabilă de a repeta codul conductei cu un subset mai mic de date. După ce conducta este testată local, poate fi scalată pentru a rula folosind PipelineSession context.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

Gestionați o conductă SageMaker prin versiunea

Versiunea artefactelor și a definițiilor conductei este o cerință comună în ciclul de viață al dezvoltării. Puteți crea mai multe versiuni ale conductei prin denumirea obiectelor pipeline cu un prefix sau sufix unic, cel mai comun fiind un marcaj de timp, așa cum se arată în următorul cod:

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

Organizați și urmăriți rulările pipeline SageMaker prin integrarea cu SageMaker Experiments

SageMaker Pipelines poate fi ușor integrat cu Experimentele SageMaker pentru organizare şi urmărirea curselor de conducte. Acest lucru se realizează prin specificare PipelineExperimentConfig la momentul creării unui obiect conductă. Cu acest obiect de configurare, puteți specifica un nume de experiment și un nume de încercare. Detaliile rulării unei conducte SageMaker sunt organizate în cadrul experimentului și testului specificat. Dacă nu specificați în mod explicit un nume de experiment, se folosește un nume de canalizare pentru numele experimentului. În mod similar, dacă nu specificați în mod explicit un nume de încercare, se folosește un ID de rulare pipeline pentru numele de încercare sau grup de rulare. Vezi următorul cod:

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

Rulați în siguranță conductele SageMaker într-un VPC privat

Pentru a securiza sarcinile de lucru ML, este cea mai bună practică să implementați joburile orchestrate de SageMaker Pipelines într-o configurație de rețea securizată într-un VPC privat, subrețele private și grupuri de securitate. Pentru a asigura și a impune utilizarea acestui mediu securizat, puteți implementa următoarele Gestionarea identității și accesului AWS (IAM) pentru Rol de execuție SageMaker (acesta este rolul asumat de conductă în timpul rulării sale). De asemenea, puteți adăuga politica pentru a rula joburile orchestrate de SageMaker Pipelines în modul de izolare a rețelei.

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

Pentru un exemplu de implementare a conductei cu aceste controale de securitate, consultați Orchestrarea joburilor, înregistrarea modelului și implementarea continuă cu Amazon SageMaker într-un mediu securizat.

Monitorizați costul rulajelor conductelor folosind etichete

Utilizarea conductelor SageMaker în sine este gratuită; plătiți pentru resursele de calcul și stocare pe care le derulați ca parte a pașilor individuali, cum ar fi procesarea, antrenamentul și inferența pe lot. Pentru a cumula costurile per cursă de conductă, puteți include tag-uri în fiecare pas al conductei care creează o resursă. Aceste etichete pot fi apoi menționate în exploratorul de costuri pentru a filtra și a agrega costul total de rulare a conductei, așa cum se arată în exemplul următor:

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

Din exploratorul de costuri, acum puteți obține costul filtrat după etichetă:

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

Modele de proiectare pentru unele scenarii comune

În această secțiune, discutăm modelele de proiectare pentru unele cazuri de utilizare obișnuite cu SageMaker Pipelines.

Rulați o funcție Python ușoară folosind un pas Lambda

Funcțiile Python sunt omniprezente în fluxurile de lucru ML; sunt utilizate în preprocesare, postprocesare, evaluare și multe altele. Lambda este un serviciu de calcul fără server care vă permite să rulați cod fără a furniza sau gestiona servere. Cu Lambda, puteți rula cod în limba preferată care include Python. Puteți folosi acest lucru pentru a rula cod Python personalizat ca parte a conductei dvs. Un pas Lambda vă permite să rulați funcții Lambda ca parte a conductei dvs. SageMaker. Începeți cu următorul cod:

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

Creați funcția Lambda folosind Asistentul Lambda al SageMaker Python SDK:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Apelați pasul Lambda:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

Transmite date între pași

Datele de intrare pentru un pas de conductă sunt fie o locație de date accesibilă, fie date generate de unul dintre pașii anteriori din conductă. Puteți furniza aceste informații ca a ProcessingInput parametru. Să ne uităm la câteva scenarii despre cum puteți utiliza ProcessingInput.

Scenariul 1: treceți rezultatul (tipurile de date primitive) unui pas Lambda unui pas de procesare

Tipurile de date primitive se referă la tipuri de date scalare precum șir, întreg, boolean și float.

Următorul fragment de cod definește o funcție Lambda care returnează un dicționar de variabile cu tipuri de date primitive. Codul funcției Lambda va returna un JSON de perechi cheie-valoare atunci când este invocat din pasul Lambda din conducta SageMaker.

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

În definiția conductei, puteți defini apoi parametrii conductei SageMaker care sunt de un anumit tip de date și puteți seta variabila la ieșirea funcției Lambda:

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

Scenariul 2: treceți rezultatul (tipuri de date neprimitive) unui pas Lambda unui pas de procesare

Tipurile de date non-primitive se referă la tipuri de date non-scalare (de exemplu, NamedTuple). Este posibil să aveți un scenariu în care trebuie să returnați un tip de date non-primitive dintr-o funcție Lambda. Pentru a face acest lucru, trebuie să convertiți tipul dvs. de date non-primitive într-un șir:

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Apoi puteți utiliza acest șir ca intrare pentru un pas ulterior din conductă. Pentru a utiliza tuplul numit în cod, utilizați eval() pentru a analiza expresia Python din șir:

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

Scenariul 3: treceți rezultatul unui pas printr-un fișier de proprietăți

De asemenea, puteți stoca rezultatul unui pas de procesare în a fișier JSON de proprietate pentru consum în aval în a ConditionStep sau alt ProcessingStep. Puteți utiliza Funcția JSONGet a interoga a fișier de proprietate. Consultați următorul cod:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

Să presupunem că conținutul fișierului de proprietate a fost următorul:

{ "hyperparam": { "eta": { "value": 0.6 } }
}

În acest caz, poate fi interogat pentru o anumită valoare și utilizat în pașii următori folosind funcția JsonGet:

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

Parametrizați o variabilă în definiția conductei

Parametrizarea variabilelor astfel încât să poată fi utilizate în timpul execuției este adesea de dorit - de exemplu, pentru a construi un URI S3. Puteți parametriza un șir astfel încât să fie evaluat în timpul execuției folosind Join funcţie. Următorul fragment de cod arată cum se definește variabila folosind Join funcția și utilizați-o pentru a seta locația de ieșire într-o etapă de procesare:

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

Rulați cod paralel peste un iterabil

Unele fluxuri de lucru ML rulează cod în bucle for paralele peste un set static de elemente (un iterabil). Poate fi fie același cod care se rulează pe date diferite, fie o bucată diferită de cod care trebuie rulată pentru fiecare articol. De exemplu, dacă aveți un număr foarte mare de rânduri într-un fișier și doriți să accelerați timpul de procesare, vă puteți baza pe modelul anterior. Dacă doriți să efectuați transformări diferite pe anumite subgrupuri din date, ar putea fi necesar să rulați o bucată diferită de cod pentru fiecare subgrup din date. Următoarele două scenarii ilustrează modul în care puteți proiecta conducte SageMaker în acest scop.

Scenariul 1: implementați o logică de procesare pe diferite porțiuni de date

Puteți rula un job de procesare cu mai multe instanțe (prin setarea instance_count la o valoare mai mare de 1). Aceasta distribuie datele de intrare de la Amazon S3 în toate instanțele de procesare. Puteți utiliza apoi un script (process.py) pentru a lucra la o anumită porțiune a datelor pe baza numărului instanței și a elementului corespunzător din lista de articole. Logica de programare din process.py poate fi scrisă astfel încât să fie rulat un modul sau o bucată de cod diferit, în funcție de lista de elemente pe care le procesează. Următorul exemplu definește un procesor care poate fi utilizat într-un ProcessingStep:

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

Scenariul 2: rulați o secvență de pași

Când aveți o secvență de pași care trebuie rulați în paralel, puteți defini fiecare secvență ca o conductă SageMaker independentă. Executarea acestor conducte SageMaker poate fi apoi declanșată dintr-o funcție Lambda care face parte dintr-un LambdaStep în conducta-mamă. Următoarea bucată de cod ilustrează scenariul în care sunt declanșate două rulări diferite de conducte SageMaker:

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

Concluzie

În această postare, am discutat câteva bune practici pentru utilizarea și întreținerea eficientă a conductelor SageMaker. De asemenea, am furnizat anumite modele pe care le puteți adopta în timp ce proiectați fluxuri de lucru cu SageMaker Pipelines, indiferent dacă creați noi conducte sau migrați fluxuri de lucru ML din alte instrumente de orchestrare. Pentru a începe cu SageMaker Pipelines pentru orchestrarea fluxului de lucru ML, consultați mostre de cod pe GitHub și Amazon SageMaker Model Building Pipelines.


Despre Autori

Best practices and design patterns for building machine learning workflows with Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Vertical Search. Ai.Pinak Panigrahi lucrează cu clienții pentru a construi soluții bazate pe învățarea automată pentru a rezolva probleme strategice de afaceri pe AWS. Când nu este ocupat cu învățarea automată, el poate fi găsit făcând o excursie, citind o carte sau urmărind sporturi.

Best practices and design patterns for building machine learning workflows with Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Vertical Search. Ai.Meenakshisundaram Thandavarayan lucrează pentru AWS ca specialist AI/ML. Are o pasiune de a proiecta, crea și promova experiențe de analiză și date centrate pe om. Meena se concentrează pe dezvoltarea de sisteme durabile care oferă avantaje măsurabile și competitive pentru clienții strategici ai AWS. Meena este un conector, gânditor de design și se străduiește să conducă afacerile către noi moduri de lucru prin inovare, incubare și democratizare.

Timestamp-ul:

Mai mult de la Învățare automată AWS