Bedste praksis og designmønstre til opbygning af maskinlæringsarbejdsgange med Amazon SageMaker Pipelines | Amazon Web Services

Bedste praksis og designmønstre til opbygning af maskinlæringsarbejdsgange med Amazon SageMaker Pipelines | Amazon Web Services

Amazon SageMaker Pipelines er en fuldt administreret AWS-tjeneste til opbygning og orkestrering af maskinlæring (ML) arbejdsgange. SageMaker Pipelines tilbyder ML-applikationsudviklere muligheden for at orkestrere forskellige trin i ML-workflowet, herunder dataindlæsning, datatransformation, træning, tuning og implementering. Du kan bruge SageMaker Pipelines til at orkestrere ML-job i SageMaker, og dets integration med det større AWS-økosystem giver dig også mulighed for at bruge ressourcer som f.eks AWS Lambda funktioner, Amazon EMR job og meget mere. Dette giver dig mulighed for at bygge en tilpasset og reproducerbar pipeline til specifikke krav i dine ML-arbejdsgange.

I dette indlæg giver vi nogle bedste fremgangsmåder til at maksimere værdien af ​​SageMaker Pipelines og gøre udviklingsoplevelsen problemfri. Vi diskuterer også nogle almindelige designscenarier og -mønstre, når vi bygger SageMaker Pipelines og giver eksempler på, hvordan de kan håndteres.

Bedste praksis for SageMaker Pipelines

I dette afsnit diskuterer vi nogle bedste praksisser, der kan følges, når man designer arbejdsgange ved hjælp af SageMaker Pipelines. Ved at vedtage dem kan det forbedre udviklingsprocessen og strømline den operationelle ledelse af SageMaker Pipelines.

Brug Pipeline Session til doven indlæsning af pipelinen

Pipeline session muliggør doven initialisering af pipelineressourcer (jobbene startes ikke før pipeline-runtime). Det PipelineSession kontekst arver SageMaker Session og implementerer praktiske metoder til at interagere med andre SageMaker-enheder og ressourcer, såsom træningsjob, slutpunkter, inputdatasæt i Amazon Simple Storage Service (Amazon S3) og så videre. Når du definerer SageMaker Pipelines, bør du bruge PipelineSession over den almindelige SageMaker-session:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

Kør pipelines i lokal tilstand for omkostningseffektive og hurtige gentagelser under udvikling

Du kan køre en pipeline i lokal tilstand ved hjælp af LocalPipelineSession sammenhæng. I denne tilstand køres pipeline og jobs lokalt ved hjælp af ressourcer på den lokale maskine i stedet for SageMaker-administrerede ressourcer. Lokal tilstand giver en omkostningseffektiv måde at iterere på pipelinekoden med en mindre delmængde af data. Efter at rørledningen er testet lokalt, kan den skaleres til at køre ved hjælp af PipelineSession sammenhæng.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

Administrer en SageMaker-pipeline gennem versionering

Versionering af artefakter og pipelinedefinitioner er et almindeligt krav i udviklingens livscyklus. Du kan oprette flere versioner af pipelinen ved at navngive pipeline-objekter med et unikt præfiks eller suffiks, hvor den mest almindelige er et tidsstempel, som vist i følgende kode:

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

Organiser og spor SageMaker-pipelinekørsler ved at integrere med SageMaker Experiments

SageMaker Pipelines kan nemt integreres med SageMaker-eksperimenter til at organisere og sporing af rørledninger. Dette opnås ved at specificere PipelineExperimentConfig på tidspunktet for oprettelse af en rørledningsobjekt. Med dette konfigurationsobjekt kan du angive et eksperimentnavn og et prøvenavn. Kørselsdetaljerne for en SageMaker-pipeline bliver organiseret under det angivne eksperiment og forsøg. Hvis du ikke udtrykkeligt angiver et eksperimentnavn, bruges et pipelinenavn til eksperimentnavnet. På samme måde, hvis du ikke eksplicit angiver et prøvenavn, bruges et pipeline-kørsels-id til prøve- eller kørselsgruppenavnet. Se følgende kode:

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

Kør sikkert SageMaker-pipelines i en privat VPC

For at sikre ML-arbejdsbelastningerne er det en god praksis at implementere de job, der er orkestreret af SageMaker Pipelines, i en sikker netværkskonfiguration inden for en privat VPC, private undernet og sikkerhedsgrupper. For at sikre og håndhæve brugen af ​​dette sikre miljø kan du implementere følgende AWS identitets- og adgangsstyring (IAM) politik for SageMaker udførelsesrolle (dette er den rolle, som rørledningen påtager sig under dens kørsel). Du kan også tilføje politikken til at køre de job, der er orkestreret af SageMaker Pipelines i netværksisoleringstilstand.

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

For et eksempel på pipelineimplementering med disse sikkerhedskontroller på plads, se Orkestrering af job, modelregistrering og kontinuerlig implementering med Amazon SageMaker i et sikkert miljø.

Overvåg omkostningerne ved pipeline-kørsler ved hjælp af tags

Det er gratis at bruge SageMaker pipelines i sig selv; du betaler for de beregnings- og lagerressourcer, du samler op som en del af de individuelle pipeline-trin som behandling, træning og batch-inferens. For at aggregere omkostningerne pr. pipelinekørsel kan du inkludere tags i hvert pipelinetrin, der skaber en ressource. Disse tags kan derefter henvises til i cost explorer for at filtrere og aggregere de samlede pipelinekørselsomkostninger, som vist i følgende eksempel:

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

Fra cost explorer kan du nu få omkostningerne filtreret efter tagget:

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

Design mønstre til nogle almindelige scenarier

I dette afsnit diskuterer vi designmønstre for nogle almindelige anvendelsestilfælde med SageMaker Pipelines.

Kør en letvægts Python-funktion ved hjælp af et Lambda-trin

Python-funktioner er allestedsnærværende i ML-arbejdsgange; de bruges i forbehandling, efterbehandling, evaluering og mere. Lambda er en serverløs computertjeneste, der lader dig køre kode uden at klargøre eller administrere servere. Med Lambda kan du køre kode på dit foretrukne sprog, der inkluderer Python. Du kan bruge dette til at køre tilpasset Python-kode som en del af din pipeline. Et Lambda-trin giver dig mulighed for at køre Lambda-funktioner som en del af din SageMaker-pipeline. Start med følgende kode:

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

Opret Lambda-funktionen ved hjælp af SageMaker Python SDK's Lambda-hjælper:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Kald Lambda-trinnet:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

Send data mellem trin

Inputdata for et pipelinetrin er enten en tilgængelig dataplacering eller data genereret af et af de foregående trin i pipelinen. Du kan give disse oplysninger som en ProcessingInput parameter. Lad os se på et par scenarier for, hvordan du kan bruge ProcessingInput.

Scenarie 1: Overfør outputtet (primitive datatyper) fra et Lambda-trin til et behandlingstrin

Primitive datatyper refererer til skalære datatyper som streng, heltal, Boolean og float.

Følgende kodestykke definerer en Lambda-funktion, der returnerer en ordbog over variabler med primitive datatyper. Din Lambda-funktionskode returnerer en JSON af nøgleværdi-par, når den kaldes fra Lambda-trinnet i SageMaker-pipelinen.

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

I pipeline-definitionen kan du derefter definere SageMaker-pipeline-parametre, der er af en specifik datatype og indstille variablen til output fra Lambda-funktionen:

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

Scenarie 2: Send output (ikke-primitive datatyper) fra et lambda-trin til et behandlingstrin

Ikke-primitive datatyper henviser til ikke-skalære datatyper (f.eks. NamedTuple). Du kan have et scenarie, hvor du skal returnere en ikke-primitiv datatype fra en Lambda-funktion. For at gøre dette skal du konvertere din ikke-primitive datatype til en streng:

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Så kan du bruge denne streng som input til et efterfølgende trin i pipelinen. For at bruge den navngivne tuple i koden, brug eval() for at parse Python-udtrykket i strengen:

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

Scenarie 3: Send output fra et trin gennem en egenskabsfil

Du kan også gemme outputtet fra et behandlingstrin i en ejendom JSON-fil til downstream-forbrug i en ConditionStep eller en anden ProcessingStep. Du kan bruge JSONGet funktion at forespørge a ejendomsfil. Se følgende kode:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

Lad os antage, at ejendomsfilens indhold var følgende:

{ "hyperparam": { "eta": { "value": 0.6 } }
}

I dette tilfælde kan den forespørges for en bestemt værdi og bruges i efterfølgende trin ved hjælp af JsonGet-funktionen:

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

Parametriser en variabel i pipeline definition

Parametrisering af variabler, så de kan bruges under kørsel, er ofte ønskeligt - for eksempel for at konstruere en S3 URI. Du kan parametrere en streng, således at den evalueres ved kørsel ved hjælp af Join fungere. Følgende kodestykke viser, hvordan man definerer variablen ved hjælp af Join funktion og brug den til at indstille outputplaceringen i et behandlingstrin:

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

Kør parallel kode over en iterabel

Nogle ML-arbejdsgange kører kode i parallelle for-loops over et statisk sæt elementer (en Iterable). Det kan enten være den samme kode, der bliver kørt på forskellige data eller et andet stykke kode, der skal køres for hvert element. For eksempel, hvis du har et meget stort antal rækker i en fil og ønsker at fremskynde behandlingstiden, kan du stole på det tidligere mønster. Hvis du vil udføre forskellige transformationer på specifikke undergrupper i dataene, skal du muligvis køre et andet stykke kode for hver undergruppe i dataene. De følgende to scenarier illustrerer, hvordan du kan designe SageMaker-pipelines til dette formål.

Scenarie 1: Implementer en behandlingslogik på forskellige dele af data

Du kan køre et behandlingsjob med flere forekomster (ved at indstille instance_count til en værdi større end 1). Dette distribuerer inputdataene fra Amazon S3 til alle behandlingsforekomster. Du kan derefter bruge et script (process.py) til at arbejde på en bestemt del af dataene baseret på instansnummeret og det tilsvarende element på listen over elementer. Programmeringslogikken i process.py kan skrives sådan, at et andet modul eller kodestykke køres afhængigt af listen over elementer, som det behandler. Følgende eksempel definerer en processor, der kan bruges i et ProcessingStep:

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

Scenarie 2: Kør en række trin

Når du har en sekvens af trin, der skal køres parallelt, kan du definere hver sekvens som en uafhængig SageMaker-pipeline. Kørslen af ​​disse SageMaker-rørledninger kan derefter udløses fra en Lambda-funktion, der er en del af en LambdaStep i moderrørledningen. Følgende kodestykke illustrerer scenariet, hvor to forskellige SageMaker-pipelinekørsler udløses:

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

Konklusion

I dette indlæg diskuterede vi nogle bedste praksisser for effektiv brug og vedligeholdelse af SageMaker-rørledninger. Vi leverede også visse mønstre, som du kan anvende, mens du designer arbejdsgange med SageMaker Pipelines, uanset om du opretter nye pipelines eller migrerer ML-arbejdsgange fra andre orkestreringsværktøjer. For at komme i gang med SageMaker Pipelines til ML workflow orkestrering, se kodeeksempler på GitHub , Amazon SageMaker Model Building Pipelines.


Om forfatterne

Best practices and design patterns for building machine learning workflows with Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Vertical Search. Ai.Pinak Panigrahi arbejder sammen med kunder om at bygge maskinlæringsdrevne løsninger til at løse strategiske forretningsproblemer på AWS. Når han ikke er beskæftiget med maskinlæring, kan han blive fundet på at tage en vandretur, læse en bog eller se sport.

Best practices and design patterns for building machine learning workflows with Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. Vertical Search. Ai.Meenakshisundaram Thandavarayan arbejder for AWS som AI/ML Specialist. Han har en passion for at designe, skabe og fremme menneskecentrerede data- og analyseoplevelser. Meena fokuserer på at udvikle bæredygtige systemer, der leverer målbare, konkurrencemæssige fordele for strategiske kunder af AWS. Meena er en connector, designtænker og stræber efter at drive forretning til nye måder at arbejde på gennem innovation, inkubation og demokratisering.

Tidsstempel:

Mere fra AWS maskinindlæring