Beste praksis og designmønstre for å bygge arbeidsflyter for maskinlæring med Amazon SageMaker Pipelines | Amazon Web Services

Beste praksis og designmønstre for å bygge arbeidsflyter for maskinlæring med Amazon SageMaker Pipelines | Amazon Web Services

Amazon SageMaker-rørledninger er en fullstendig administrert AWS-tjeneste for å bygge og orkestrere arbeidsflyter for maskinlæring (ML). SageMaker Pipelines tilbyr ML-applikasjonsutviklere muligheten til å orkestrere ulike trinn i ML-arbeidsflyten, inkludert datalasting, datatransformasjon, opplæring, tuning og distribusjon. Du kan bruke SageMaker Pipelines til å orkestrere ML-jobber i SageMaker, og dens integrasjon med det større AWS-økosystemet lar deg også bruke ressurser som AWS Lambda funksjoner, Amazon EMR jobber og mer. Dette lar deg bygge en tilpasset og reproduserbar pipeline for spesifikke krav i ML-arbeidsflytene dine.

I dette innlegget gir vi noen beste fremgangsmåter for å maksimere verdien av SageMaker Pipelines og gjøre utviklingsopplevelsen sømløs. Vi diskuterer også noen vanlige designscenarier og mønstre når vi bygger SageMaker Pipelines og gir eksempler for å håndtere dem.

Beste praksis for SageMaker Pipelines

I denne delen diskuterer vi noen beste fremgangsmåter som kan følges mens du designer arbeidsflyter ved å bruke SageMaker Pipelines. Å ta i bruk dem kan forbedre utviklingsprosessen og effektivisere den operative styringen av SageMaker Pipelines.

Bruk Pipeline Session for lat lasting av rørledningen

Pipeline økt muliggjør lat initialisering av pipeline-ressurser (jobbene startes ikke før pipeline-kjøring). De PipelineSession konteksten arver SageMaker-økt og implementerer praktiske metoder for å samhandle med andre SageMaker-enheter og ressurser, for eksempel opplæringsjobber, endepunkter, inndatasett i Amazon enkel lagringstjeneste (Amazon S3), og så videre. Når du definerer SageMaker Pipelines, bør du bruke PipelineSession over den vanlige SageMaker-økten:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

Kjør rørledninger i lokal modus for kostnadseffektive og raske iterasjoner under utvikling

Du kan kjøre en rørledning i lokal modus bruker LocalPipelineSession kontekst. I denne modusen kjøres rørledningen og jobbene lokalt ved å bruke ressurser på den lokale maskinen, i stedet for SageMaker-administrerte ressurser. Lokal modus gir en kostnadseffektiv måte å iterere på rørledningskoden med et mindre delsett av data. Etter at rørledningen er testet lokalt, kan den skaleres til å kjøre ved hjelp av PipelineSession kontekst.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

Administrer en SageMaker-pipeline gjennom versjonering

Versjon av artefakter og rørledningsdefinisjoner er et vanlig krav i utviklingslivssyklusen. Du kan lage flere versjoner av rørledningen ved å navngi rørledningsobjekter med et unikt prefiks eller suffiks, den vanligste er et tidsstempel, som vist i følgende kode:

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

Organiser og spor SageMaker-pipelinekjøringer ved å integrere med SageMaker Experiments

SageMaker Pipelines kan enkelt integreres med SageMaker eksperimenter for organisering og sporing av rørledninger. Dette oppnås ved å spesifisere PipelineExperimentConfig på tidspunktet for opprettelse av en rørledningsobjekt. Med dette konfigurasjonsobjektet kan du spesifisere et eksperimentnavn og et prøvenavn. Kjøringsdetaljene til en SageMaker-pipeline blir organisert under det spesifiserte eksperimentet og prøven. Hvis du ikke eksplisitt spesifiserer et eksperimentnavn, brukes et pipelinenavn for eksperimentnavnet. På samme måte, hvis du ikke eksplisitt spesifiserer et prøvenavn, brukes en pipeline-kjørings-ID for prøve- eller kjøringsgruppenavnet. Se følgende kode:

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

Kjør SageMaker-rørledninger på en sikker måte i en privat VPC

For å sikre ML-arbeidsbelastningene er det en beste praksis å distribuere jobbene orkestrert av SageMaker Pipelines i en sikker nettverkskonfigurasjon innenfor en privat VPC, private undernett og sikkerhetsgrupper. For å sikre og håndheve bruken av dette sikre miljøet, kan du implementere følgende AWS identitets- og tilgangsadministrasjon (IAM) policy for SageMaker utførelsesrolle (dette er rollen som rørledningen påtar seg under kjøringen). Du kan også legge til policyen for å kjøre jobbene orkestrert av SageMaker Pipelines i nettverksisolasjonsmodus.

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

For et eksempel på rørledningsimplementering med disse sikkerhetskontrollene på plass, se Orkestrere jobber, modellregistrering og kontinuerlig distribusjon med Amazon SageMaker i et sikkert miljø.

Overvåk kostnadene for rørledningskjøringer ved å bruke tagger

Å bruke SageMaker-rørledninger i seg selv er gratis; du betaler for data- og lagringsressursene du samler opp som en del av de individuelle pipeline-trinnene som prosessering, opplæring og batch-slutning. For å aggregere kostnadene per rørledningskjøring kan du inkludere tags i hvert pipelinetrinn som skaper en ressurs. Disse kodene kan deretter refereres til i kostnadsutforskeren for å filtrere og samle totale kostnader for pipelinekjøring, som vist i følgende eksempel:

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

Fra kostnadsutforskeren kan du nå få kostnadene filtrert etter taggen:

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

Design mønstre for noen vanlige scenarier

I denne delen diskuterer vi designmønstre for noen vanlige brukstilfeller med SageMaker Pipelines.

Kjør en lett Python-funksjon ved å bruke et Lambda-trinn

Python-funksjoner er allestedsnærværende i ML-arbeidsflyter; de brukes i forbehandling, etterbehandling, evaluering og mer. Lambda er en serverløs databehandlingstjeneste som lar deg kjøre kode uten å klargjøre eller administrere servere. Med Lambda kan du kjøre kode på ditt foretrukne språk som inkluderer Python. Du kan bruke dette til å kjøre egendefinert Python-kode som en del av rørledningen din. Et Lambda-trinn lar deg kjøre Lambda-funksjoner som en del av din SageMaker-pipeline. Start med følgende kode:

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

Lag Lambda-funksjonen ved å bruke SageMaker Python SDKs Lambda-hjelper:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Ring Lambda-trinnet:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

Send data mellom trinnene

Inndata for et rørledningstrinn er enten en tilgjengelig dataplassering eller data generert av et av de foregående trinnene i rørledningen. Du kan gi denne informasjonen som en ProcessingInput parameter. La oss se på noen få scenarier for hvordan du kan bruke ProcessingInput.

Scenario 1: Send utdataene (primitive datatyper) fra et Lambda-trinn til et behandlingstrinn

Primitive datatyper refererer til skalardatatyper som streng, heltall, boolsk og flytende.

Følgende kodebit definerer en Lambda-funksjon som returnerer en ordbok med variabler med primitive datatyper. Lambda-funksjonskoden din vil returnere en JSON med nøkkelverdi-par når den påkalles fra Lambda-trinnet i SageMaker-pipelinen.

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

I pipeline-definisjonen kan du deretter definere SageMaker pipeline-parametere som er av en spesifikk datatype og sette variabelen til utdata fra Lambda-funksjonen:

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

Scenario 2: Send utdata (ikke-primitive datatyper) fra et Lambda-trinn til et behandlingstrinn

Ikke-primitive datatyper refererer til ikke-skalære datatyper (f.eks. NamedTuple). Du kan ha et scenario når du må returnere en ikke-primitiv datatype fra en Lambda-funksjon. For å gjøre dette, må du konvertere din ikke-primitive datatype til en streng:

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Deretter kan du bruke denne strengen som input til et påfølgende trinn i pipelinen. For å bruke den navngitte tuppelen i koden, bruk eval() for å analysere Python-uttrykket i strengen:

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

Scenario 3: Send utdataene fra et trinn gjennom en egenskapsfil

Du kan også lagre utdata fra et behandlingstrinn i en egenskap JSON-fil for nedstrøms forbruk i en ConditionStep eller en annen ProcessingStep. Du kan bruke JSONGet funksjon å spørre a eiendomsfil. Se følgende kode:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

La oss anta at egenskapsfilens innhold var følgende:

{ "hyperparam": { "eta": { "value": 0.6 } }
}

I dette tilfellet kan den spørres etter en bestemt verdi og brukes i påfølgende trinn ved å bruke JsonGet-funksjonen:

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

Parameteriser en variabel i pipelinedefinisjonen

Parametrisering av variabler slik at de kan brukes under kjøring er ofte ønskelig - for eksempel for å konstruere en S3 URI. Du kan parameterisere en streng slik at den blir evaluert under kjøring ved å bruke Join funksjon. Følgende kodebit viser hvordan du definerer variabelen ved å bruke Join funksjon og bruk den til å angi utdataplasseringen i et behandlingstrinn:

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

Kjør parallell kode over en iterabel

Noen ML-arbeidsflyter kjører kode i parallelle for-løkker over et statisk sett med elementer (en iterabel). Det kan enten være den samme koden som kjøres på forskjellige data eller en annen kodebit som må kjøres for hvert element. For eksempel, hvis du har et veldig stort antall rader i en fil og ønsker å fremskynde behandlingstiden, kan du stole på det tidligere mønsteret. Hvis du vil utføre forskjellige transformasjoner på spesifikke undergrupper i dataene, må du kanskje kjøre en annen kode for hver undergruppe i dataene. De følgende to scenariene illustrerer hvordan du kan designe SageMaker-rørledninger for dette formålet.

Scenario 1: Implementer en behandlingslogikk på forskjellige deler av data

Du kan kjøre en behandlingsjobb med flere forekomster (ved å angi instance_count til en verdi større enn 1). Dette distribuerer inndataene fra Amazon S3 til alle behandlingsforekomstene. Du kan deretter bruke et skript (process.py) for å arbeide med en bestemt del av dataene basert på forekomstnummeret og det tilsvarende elementet i listen over elementer. Programmeringslogikken i process.py kan skrives slik at en annen modul eller kodebit kjøres avhengig av listen over elementer den behandler. Følgende eksempel definerer en prosessor som kan brukes i et ProcessingStep:

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

Scenario 2: Kjør en sekvens med trinn

Når du har en sekvens av trinn som må kjøres parallelt, kan du definere hver sekvens som en uavhengig SageMaker-pipeline. Kjøringen av disse SageMaker-rørledningene kan deretter utløses fra en Lambda-funksjon som er en del av en LambdaStep i overordnet pipeline. Følgende kodestykke illustrerer scenariet der to forskjellige SageMaker-rørledningskjøringer utløses:

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

konklusjonen

I dette innlegget diskuterte vi noen beste fremgangsmåter for effektiv bruk og vedlikehold av SageMaker-rørledninger. Vi ga også visse mønstre som du kan ta i bruk mens du designer arbeidsflyter med SageMaker Pipelines, enten du skriver nye pipelines eller migrerer ML-arbeidsflyter fra andre orkestreringsverktøy. For å komme i gang med SageMaker Pipelines for ML arbeidsflytorkestrering, se kodeeksempler på GitHub og Amazon SageMaker Model Building Pipelines.


Om forfatterne

Beste praksis og designmønstre for å bygge arbeidsflyter for maskinlæring med Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Pinak Panigrahi jobber med kunder for å bygge maskinlæringsdrevne løsninger for å løse strategiske forretningsproblemer på AWS. Når han ikke er opptatt med maskinlæring, kan han bli funnet å ta en fottur, lese en bok eller se på sport.

Beste praksis og designmønstre for å bygge arbeidsflyter for maskinlæring med Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Meenakshisundaram Thandavarayan jobber for AWS som AI/ML-spesialist. Han har en lidenskap for å designe, skape og fremme menneskesentrerte data- og analyseopplevelser. Meena fokuserer på å utvikle bærekraftige systemer som leverer målbare, konkurransefortrinn for strategiske kunder av AWS. Meena er en kobling, designtenker og streber etter å drive virksomheten til nye måter å jobbe på gjennom innovasjon, inkubasjon og demokratisering.

Tidstempel:

Mer fra AWS maskinlæring