שיטות עבודה מומלצות ודפוסי עיצוב לבניית תהליכי עבודה של למידת מכונה עם Amazon SageMaker Pipelines | שירותי האינטרנט של אמזון

שיטות עבודה מומלצות ודפוסי עיצוב לבניית תהליכי עבודה של למידת מכונה עם Amazon SageMaker Pipelines | שירותי האינטרנט של אמזון

צינורות SageMaker של אמזון הוא שירות AWS מנוהל במלואו לבנייה ותזמור של תהליכי עבודה של למידת מכונה (ML). SageMaker Pipelines מציעה למפתחי יישומי ML את היכולת לתזמן שלבים שונים של זרימת העבודה של ML, כולל טעינת נתונים, שינוי נתונים, הדרכה, כוונון ופריסה. אתה יכול להשתמש ב- SageMaker Pipelines כדי לתזמן עבודות ML ב- SageMaker, ושלו אינטגרציה עם מערכת האקולוגית הגדולה יותר של AWS גם מאפשר לך להשתמש במשאבים כמו AWS למבדה פונקציות, אמזון EMR מקומות עבודה ועוד. זה מאפשר לך לבנות צינור מותאם וניתן לשחזור עבור דרישות ספציפיות בזרימות העבודה שלך ב-ML.

בפוסט זה, אנו מספקים כמה שיטות עבודה מומלצות כדי למקסם את הערך של SageMaker Pipelines ולהפוך את חווית הפיתוח לחלקה. אנו גם דנים בכמה תרחישים ותבניות עיצוב נפוצים בעת בניית צינורות SageMaker ומספקים דוגמאות לטיפול בהם.

שיטות עבודה מומלצות עבור SageMaker Pipelines

בסעיף זה, אנו דנים בכמה שיטות עבודה מומלצות שניתן לבצע בעת עיצוב זרימות עבודה באמצעות SageMaker Pipelines. אימוץ שלהם יכול לשפר את תהליך הפיתוח ולייעל את הניהול התפעולי של SageMaker Pipelines.

השתמש ב-Pipeline Session לטעינה עצלנית של הצינור

מושב צינור מאפשר אתחול עצלן של משאבי צינור (העבודות לא מתחילות עד לזמן הריצה של הצינור). ה PipelineSession ההקשר יורש את סשן של SageMaker ומיישם שיטות נוחות לאינטראקציה עם ישויות ומשאבים אחרים של SageMaker, כגון עבודות הדרכה, נקודות קצה, מערכי נתונים שירות אחסון פשוט של אמזון (Amazon S3), וכן הלאה. בעת הגדרת SageMaker Pipelines, עליך להשתמש PipelineSession במהלך הפגישה הרגילה של SageMaker:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge’, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=pipeline_session,
)

הפעל צינורות במצב מקומי עבור איטרציות חסכוניות ומהירות במהלך הפיתוח

אתה יכול להפעיל א צינור במצב מקומי באמצעות LocalPipelineSession הֶקשֵׁר. במצב זה, הצינור והעבודות מופעלות באופן מקומי באמצעות משאבים במחשב המקומי, במקום משאבים מנוהלים של SageMaker. מצב מקומי מספק דרך חסכונית לחזור על קוד הצינור עם תת-קבוצה קטנה יותר של נתונים. לאחר שהצינור נבדק באופן מקומי, ניתן לשנות את קנה המידה להפעלה באמצעות ה- PipelineSession ההקשר.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, sagemaker_session=local_pipeline_session,
)

נהל צנרת של SageMaker באמצעות ניהול גרסאות

גירסאות של חפצים והגדרות צינור היא דרישה נפוצה במחזור החיים של הפיתוח. אתה יכול ליצור גרסאות מרובות של הצינור על ידי מתן שם לאובייקטי צינור עם קידומת או סיומת ייחודית, הנפוצה ביותר היא חותמת זמן, כפי שמוצג בקוד הבא:

from sagemaker.workflow.pipeline_context import PipelineSession
import time current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline( name=pipeline_name, steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session,
)

ארגן ועקוב אחר ריצות צנרת של SageMaker על ידי שילוב עם SageMaker Experiments

ניתן לשלב בקלות עם צינורות SageMaker SageMaker ניסויים לארגון ו מעקב אחר ריצות צינור. זה מושג על ידי ציון PipelineExperimentConfig בזמן יצירת א אובייקט צינור. עם אובייקט תצורה זה, אתה יכול לציין שם ניסוי ושם ניסוי. פרטי הריצה של צינור SageMaker מתארגנים תחת הניסוי והניסוי שצוינו. אם אינך מציין במפורש שם ניסוי, שם צינור ישמש לשם הניסוי. באופן דומה, אם אינך מציין במפורש שם ניסיון, ישמש מזהה הפעלת צינור עבור שם קבוצת הניסיון או ההפעלה. ראה את הקוד הבא:

Pipeline( name="MyPipeline", parameters=[...], pipeline_experiment_config=PipelineExperimentConfig( experiment_name = ExecutionVariables.PIPELINE_NAME, trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID ), steps=[...]
)

הפעל בצורה מאובטחת צינורות SageMaker בתוך VPC פרטי

כדי לאבטח את עומסי העבודה של ML, שיטת העבודה הטובה ביותר היא לפרוס את העבודות המתוזמרות על ידי SageMaker Pipelines בתצורת רשת מאובטחת בתוך VPC פרטי, רשתות משנה פרטיות וקבוצות אבטחה. כדי להבטיח ולאכוף את השימוש בסביבה מאובטחת זו, תוכל ליישם את הדברים הבאים AWS זהות וניהול גישה (IAM) מדיניות עבור תפקיד ביצוע של SageMaker (זהו התפקיד שנוטל על עצמו הצינור במהלך הריצה שלו). אתה יכול גם להוסיף את המדיניות כדי להפעיל את העבודות שמתוזמרות על ידי SageMaker Pipelines במצב בידוד רשת.

# IAM Policy to enforce execution within a private VPC { "Action": [ "sagemaker:CreateProcessingJob", "sagemaker:CreateTrainingJob", "sagemaker:CreateModel" ], "Resource": "*", "Effect": "Deny", "Condition": { "Null": { "sagemaker:VpcSubnets": "true" } }
} # IAM Policy to enforce execution in network isolation mode
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Deny", "Action": [ "sagemaker:Create*" ], "Resource": "*", "Condition": { "StringNotEqualsIfExists": { "sagemaker:NetworkIsolation": "true" } } } ]
}

לדוגמא של יישום צינור עם בקרות אבטחה אלה במקום, עיין ב תזמור משרות, רישום דגמים ופריסה רציפה עם Amazon SageMaker בסביבה מאובטחת.

עקוב אחר עלות ריצות הצינור באמצעות תגים

השימוש בצנרת של SageMaker כשלעצמו הוא בחינם; אתה משלם עבור משאבי המחשוב והאחסון שאתה מגבש כחלק משלבי הצינור הבודדים כמו עיבוד, הדרכה והסקת אצווה. כדי לצבור את העלויות לכל הפעלת צינור, אתה יכול לכלול תיוגים בכל שלב בצינור שיוצר משאב. לאחר מכן ניתן להפנות לתגים אלה בסייר העלויות כדי לסנן ולצבור את עלות הפעלת הצינור הכוללת, כפי שמוצג בדוגמה הבאה:

sklearn_processor = SKLearnProcessor( framework_version=’0.20.0’, instance_type=’ml.m5.xlarge, instance_count=1, base_job_name="sklearn-abalone-process", role=role, tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
) step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, ...
)

מסייר העלויות, כעת תוכל לסנן את העלות באמצעות התג:

response = client.get_cost_and_usage( TimePeriod={ 'Start': '2023-07-01', 'End': '2023-07-15' }, Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'], Granularity='MONTHLY', Filter={ 'Dimensions': { 'Key':'USAGE_TYPE', 'Values': [ ‘SageMaker:Pipeline’ ] }, 'Tags': { 'Key': 'keyName', 'Values': [ 'keyValue', ] } }
)

עיצוב דפוסים עבור כמה תרחישים נפוצים

בחלק זה, אנו דנים בדפוסי עיצוב עבור כמה מקרי שימוש נפוצים עם SageMaker Pipelines.

הפעל פונקציית Python קלת משקל באמצעות צעד Lambda

פונקציות Python נמצאות בכל מקום בזרימות עבודה של ML; הם משמשים בעיבוד מקדים, לאחר עיבוד, הערכה ועוד. Lambda הוא שירות מחשוב ללא שרתים המאפשר לך להריץ קוד מבלי להקצות או לנהל שרתים. עם Lambda, אתה יכול להריץ קוד בשפה המועדפת עליך הכולל את Python. אתה יכול להשתמש בזה כדי להפעיל קוד Python מותאם אישית כחלק מהצינור שלך. צעד למבדה מאפשר לך להפעיל פונקציות Lambda כחלק מצינור SageMaker שלך. התחל עם הקוד הבא:

%%writefile lambdafunc.py import json def lambda_handler(event, context): str1 = event["str1"] str2 = event["str2"] str3 = str1 + str2 return { "str3": str3 }

צור את פונקציית Lambda באמצעות ה- עוזר Lambda של SageMaker Python SDK:

from sagemaker.lambda_helper import Lambda def create_lambda(function_name, script, handler): response = Lambda( function_name=function_name, execution_role_arn=role, script= script, handler=handler, timeout=600, memory_size=10240, ).upsert() function_arn = response['FunctionArn'] return function_arn fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

קרא לשלב למבדה:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
) str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String) # Lambda Step
step_lambda1 = LambdaStep( name="LambdaStep1", lambda_func=Lambda( function_arn=fn_arn ), inputs={ "str1": "Hello", "str2": " World" }, outputs=[str3],
)

העבר נתונים בין שלבים

נתוני קלט עבור שלב צינור הם מיקום נתונים נגיש או נתונים שנוצרו על ידי אחד מהשלבים הקודמים בצינור. אתה יכול לספק מידע זה בתור א ProcessingInput פָּרָמֶטֶר. בואו נסתכל על כמה תרחישים של איך אתה יכול להשתמש ProcessingInput.

תרחיש 1: העבר את הפלט (סוגי נתונים פרימיטיביים) של שלב למבדה לשלב עיבוד

סוגי נתונים פרימיטיביים מתייחסים לסוגי נתונים סקלרים כמו מחרוזת, מספר שלם, בוליאנית וצוף.

קטע הקוד הבא מגדיר פונקציית Lambda שמחזירה מילון של משתנים עם סוגי נתונים פרימיטיביים. קוד פונקציית Lambda שלך יחזיר JSON של צמדי מפתח-ערך כאשר יופעל משלב Lambda בתוך צינור SageMaker.

def handler(event, context): ... return { "output1": "string_value", "output2": 1, "output3": True, "output4": 2.0, }

בהגדרת הצינור, לאחר מכן תוכל להגדיר פרמטרים של צינור של SageMaker שהם מסוג נתונים ספציפי ולהגדיר את המשתנה לפלט של פונקציית Lambda:

from sagemaker.workflow.lambda_step import ( LambdaStep, LambdaOutput, LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor role = sagemaker.get_execution_role()
pipeline_session = PipelineSession() # 1. Define the output params of the Lambda Step str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float) # 2. Lambda step invoking the lambda function and returns the Output step_lambda = LambdaStep( name="MyLambdaStep", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda", session=PipelineSession(), ), inputs={"arg1": "foo", "arg2": "foo1"}, outputs=[ str_outputParam, int_outputParam, bool_outputParam, float_outputParam ],
) # 3. Extract the output of the Lambda str_outputParam = step_lambda.properties.Outputs["output1"] # 4. Use it in a subsequent step. For ex. Processing step sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, sagemaker_session=pipeline_session, role=role
) processor_args = sklearn_processor.run( code="code/preprocess.py", #python script to run arguments=["--input-args", str_outputParam]
) step_process = ProcessingStep( name="processstep1", step_args=processor_args,
)

תרחיש 2: העברת הפלט (סוגי נתונים לא פרימיטיביים) של שלב למבדה לשלב עיבוד

סוגי נתונים לא פרימיטיביים מתייחסים לסוגי נתונים לא סקלאריים (לדוגמה, NamedTuple). ייתכן שיש לך תרחיש שבו עליך להחזיר סוג נתונים לא פרימיטיבי מפונקציית Lambda. כדי לעשות זאת, עליך להמיר את סוג הנתונים הלא-פרימיטיביים שלך למחרוזת:

# Lambda function code returning a non primitive data type from collections import namedtuple def lambda_handler(event, context): Outputs = namedtuple("Outputs", "sample_output") named_tuple = Outputs( [ {'output1': 1, 'output2': 2}, {'output3': 'foo', 'output4': 'foo1'} ] )
return{ "named_tuple_string": str(named_tuple)
}

#Pipeline step that uses the Lambda output as a “Parameter Input” output_ref = step_lambda.properties.Outputs["named_tuple_string"]

לאחר מכן תוכל להשתמש במחרוזת זו כקלט לשלב הבא בצנרת. כדי להשתמש ב-tuple עם השם בקוד, השתמש eval() לנתח את ביטוי Python במחרוזת:

# Decipher the string in your processing logic code import argparse
from collections import namedtuple Outputs = namedtuple("Outputs", "sample_output") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--named_tuple_string", type=str, required=True) args = parser.parse_args() #use eval to obtain the named tuple from the string named_tuple = eval(args.named_tuple_string)

תרחיש 3: העבר את הפלט של שלב דרך קובץ מאפיין

אתה יכול גם לאחסן את הפלט של שלב עיבוד ב-a קובץ JSON מאפיין לצריכה במורד הזרם ב א ConditionStep או אחר ProcessingStep. אתה יכול להשתמש ב- פונקציית JSONGet לשאול א קובץ רכוש. ראה את הקוד הבא:

# 1. Define a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type="ml.m5.xlarge", instance_count=1, base_job_name="sklearn-abalone-preprocess", sagemaker_session=session, role=sagemaker.get_execution_role(),
) step_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="hyperparam", source="/opt/ml/processing/evaluation" ), ], code="./local/preprocess.py", arguments=["--input-data", "s3://my-input"],
) # 2. Define a PropertyFile where the output_name matches that with the one used in the Processor

hyperparam_report = PropertyFile( name="AbaloneHyperparamReport", output_name="hyperparam", path="hyperparam.json",
)

נניח שתוכן קובץ הנכסים היה הבא:

{ "hyperparam": { "eta": { "value": 0.6 } }
}

במקרה זה, ניתן לבקש ממנו ערך ספציפי ולהשתמש בו בשלבים הבאים באמצעות הפונקציה JsonGet:

# 3. Query the property file
eta = JsonGet( step_name=step_process.name, property_file=hyperparam_report, json_path="hyperparam.eta.value",
)

פרמטר משתנה בהגדרת צינור

פרמטר של משתנים כך שניתן יהיה להשתמש בהם בזמן ריצה רצוי לעתים קרובות - לדוגמה, כדי לבנות URI S3. אתה יכול להגדיר פרמטרים למחרוזת כך שתוערך בזמן ריצה באמצעות ה- Join פוּנקצִיָה. קטע הקוד הבא מראה כיצד להגדיר את המשתנה באמצעות ה- Join פונקציה והשתמש בזה כדי להגדיר את מיקום הפלט בשלב עיבוד:

# define the variable to store the s3 URI
s3_location = Join( on="/", values=[ "s3:/", ParameterString( name="MyBucket", default_value="" ), "training", ExecutionVariables.PIPELINE_EXECUTION_ID ]
) # define the processing step
sklearn_processor = SKLearnProcessor( framework_version="1.2-1", instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role,
) # use the s3uri as the output location in processing step
processor_run_args = sklearn_processor.run( outputs=[ ProcessingOutput( output_name="train", source="/opt/ml/processing/train", destination=s3_location, ), ], code="code/preprocess.py"
) step_process = ProcessingStep( name="PreprocessingJob”, step_args=processor_run_args,
)

הרץ קוד מקביל על איטרנס

זרימות עבודה מסוימות של ML מרצות קוד במקביל for-loops על קבוצה סטטית של פריטים (an חובה). זה יכול להיות אותו קוד שמופעל על נתונים שונים או קטע קוד אחר שצריך להפעיל עבור כל פריט. לדוגמה, אם יש לך מספר גדול מאוד של שורות בקובץ ואתה רוצה להאיץ את זמן העיבוד, אתה יכול להסתמך על התבנית הקודמת. אם ברצונך לבצע טרנספורמציות שונות על תת-קבוצות ספציפיות בנתונים, ייתכן שתצטרך להריץ פיסת קוד שונה עבור כל תת-קבוצה בנתונים. שני התרחישים הבאים ממחישים כיצד ניתן לתכנן צנרת של SageMaker למטרה זו.

תרחיש 1: הטמעת היגיון עיבוד על חלקים שונים של נתונים

אתה יכול להפעיל עבודת עיבוד עם מספר מופעים (על ידי הגדרה instance_count לערך גדול מ-1). זה מפיץ את נתוני הקלט מאמזון S3 לכל מופעי העיבוד. לאחר מכן תוכל להשתמש בסקריפט (process.py) כדי לעבוד על חלק מסוים מהנתונים בהתבסס על מספר המופע והאלמנט המתאים ברשימת הפריטים. ניתן לכתוב את היגיון התכנות ב-process.py כך שמודול או קטע קוד אחר יופעל בהתאם לרשימת הפריטים שהוא מעבד. הדוגמה הבאה מגדירה מעבד שניתן להשתמש בו בשלב ProcessingStep:

sklearn_processor = FrameworkProcessor( estimator_cls=sagemaker.sklearn.estimator.SKLearn, framework_version="0.23-1", instance_type='ml.m5.4xlarge', instance_count=4, #number of parallel executions / instances base_job_name="parallel-step", sagemaker_session=session, role=role,
) step_args = sklearn_processor.run( code='process.py', arguments=[ "--items", list_of_items, #data structure containing a list of items inputs=[ ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv", destination="/opt/ml/processing/input" ) ], ]
)

תרחיש 2: הפעל רצף של שלבים

כאשר יש לך רצף של שלבים שצריך להפעיל במקביל, אתה יכול להגדיר כל רצף כצנרת עצמאית של SageMaker. לאחר מכן ניתן להפעיל את הריצה של צינורות SageMaker אלה מפונקציית Lambda שהיא חלק מ-a LambdaStep בצנרת האב. קטע הקוד הבא ממחיש את התרחיש שבו מופעלות שתי ריצות צנרת שונות של SageMaker:

import boto3
def lambda_handler(event, context): items = [1, 2] #sagemaker client sm_client = boto3.client("sagemaker") #name of the pipeline that needs to be triggered. #if there are multiple, you can fetch available pipelines using boto3 api #and trigger the appropriate one based on your logic. pipeline_name = 'child-pipeline-1' #trigger pipeline for every item response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), ) pipeline_name = 'child-pipeline-2' response_ppl = sm_client.start_pipeline_execution( PipelineName=pipeline_name, PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s), )
return

סיכום

בפוסט זה, דנו בכמה שיטות עבודה מומלצות לשימוש ותחזוקה יעילים של צינורות SageMaker. סיפקנו גם דפוסים מסוימים שתוכלו לאמץ בזמן עיצוב זרימות עבודה עם SageMaker Pipelines, בין אם אתם יוצרים צינורות חדשים ובין אם אתם מעבירים זרימות עבודה של ML מכלי תזמור אחרים. כדי להתחיל עם SageMaker Pipelines עבור תזמור זרימת עבודה של ML, עיין ב- דוגמאות קוד ב-GitHub ו אמזון SageMaker מודלים לבניית צינורות.


על הכותבים

שיטות עבודה מומלצות ודפוסי עיצוב לבניית תהליכי עבודה של למידת מכונה עם Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.פינאק פניגרהי עובד עם לקוחות לבניית פתרונות מונעי למידת מכונה לפתרון בעיות עסקיות אסטרטגיות ב-AWS. כאשר אינו עוסק בלמידת מכונה, ניתן למצוא אותו מטייל, קורא ספר או צופה בספורט.

שיטות עבודה מומלצות ודפוסי עיצוב לבניית תהליכי עבודה של למידת מכונה עם Amazon SageMaker Pipelines | Amazon Web Services PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.Meenakshisundaram Thandavarayan עובד עבור AWS כמומחה AI/ML. יש לו תשוקה לעצב, ליצור ולקדם חוויות נתונים וחוויות ניתוח הממוקדות באדם. Meena מתמקדת בפיתוח מערכות בנות קיימא המספקות יתרונות מדידים ותחרותיים ללקוחות האסטרטגיים של AWS. Meena היא מחברת, הוגה עיצוב, ושואפת להניע עסקים לדרכים חדשות של עבודה באמצעות חדשנות, דגירה ודמוקרטיזציה.

בול זמן:

עוד מ למידת מכונות AWS