תזמר תהליכי עבודה מבוססי Ray באמצעות Amazon SageMaker | שירותי האינטרנט של אמזון

תזמר תהליכי עבודה מבוססי Ray באמצעות Amazon SageMaker | שירותי האינטרנט של אמזון

למידת מכונה (ML) הופכת מורכבת יותר ויותר ככל שלקוחות מנסים לפתור בעיות מאתגרות יותר ויותר. מורכבות זו מובילה לעתים קרובות לצורך ב-ML מבוזר, שבו משתמשים במספר מכונות לאימון דגם בודד. למרות שזה מאפשר הקבילה של משימות על פני מספר צמתים, מה שמוביל לזמני אימון מואצים, מדרגיות משופרת ושיפור ביצועים, ישנם אתגרים משמעותיים בשימוש יעיל בחומרה מבוזרת. מדעני נתונים צריכים להתמודד עם אתגרים כמו חלוקת נתונים, איזון עומסים, סבילות לתקלות ומדרגיות. מהנדסי ML חייבים לטפל בהקבלה, תזמון, תקלות וניסיונות חוזרים באופן ידני, הדורשים קוד תשתית מורכב.

בפוסט זה נדון ביתרונות השימוש ריי ו אמזון SageMaker עבור ML מבוזר, וספק מדריך שלב אחר שלב כיצד להשתמש במסגרות אלו כדי לבנות ולפרוס זרימת עבודה ML ניתנת להרחבה.

Ray, מסגרת מחשוב מבוזר בקוד פתוח, מספקת מסגרת גמישה להדרכה מבוזרת והגשה של מודלים של ML. הוא מפשט פרטי מערכת מבוזרת ברמה נמוכה באמצעות ספריות פשוטות וניתנות להרחבה עבור משימות ML נפוצות כגון עיבוד מקדים של נתונים, הדרכה מבוזרת, כוונון היפרפרמטרים, למידת חיזוק והגשת מודלים.

SageMaker הוא שירות מנוהל במלואו לבנייה, הדרכה ופריסה של מודלים של ML. Ray משתלב בצורה חלקה עם תכונות SageMaker כדי לבנות ולפרוס עומסי עבודה מורכבים של ML שהם גם יעילים ואמינים. השילוב של Ray ו-SageMaker מספק יכולות מקצה לקצה עבור זרימות עבודה מדרגיות של ML, ויש לו את התכונות המודגשות הבאות:

  • שחקנים מבוזרים ומבני מקביליות ב-Ray מפשטים את הפיתוח של אפליקציות מבוזרות.
  • Ray AI Runtime (AIR) מפחית את החיכוך של מעבר מפיתוח לייצור. עם Ray ו-AIR, אותו קוד Python יכול להתרחב בצורה חלקה ממחשב נייד לאשכול גדול.
  • התשתית המנוהלת של SageMaker ותכונות כמו עבודות עיבוד, עבודות הדרכה ועבודות כוונון היפרפרמטרים יכולים להשתמש בספריות Ray מתחת למחשוב מבוזר.
  • ניסויים באמזון SageMaker מאפשר איטרציה מהירה ומעקב אחר ניסויים.
  • חנות הפונקציות של אמזון SageMaker מספק מאגר ניתן להרחבה לאחסון, אחזור ושיתוף של תכונות ML לאימון מודלים.
  • ניתן לאחסן, לבצע גרסאות ולעקוב אחר דגמים מאומנים רישום הדגמים של אמזון SageMaker לממשל ולניהול.
  • צינורות SageMaker של אמזון מאפשר לתזמן את מחזור החיים של ML מקצה לקצה מהכנת נתונים והדרכה ועד לפריסת מודל כזרימות עבודה אוטומטיות.

סקירת פתרונות

פוסט זה מתמקד ביתרונות השימוש ב-Ray ו- SageMaker יחד. הקמנו זרימת עבודה ML מבוססת Ray מקצה לקצה, המתוזמרת באמצעות SageMaker Pipelines. זרימת העבודה כוללת הטמעה מקבילה של נתונים לתוך מאגר התכונות באמצעות שחקני Ray, עיבוד מקדים של נתונים עם Ray Data, מודלים של אימון וכוונון היפרפרמטרים בקנה מידה באמצעות עבודות כוונון Ray Train ו-HPO, ולבסוף הערכת מודל ורישום המודל לתוך רישום דגמים.

עבור הנתונים שלנו, אנו משתמשים מערך נתונים של דיור סינטטי המורכב משמונה תכונות (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH, ו DECK) והמודל שלנו יחזה את PRICE של הבית.

כל שלב בזרימת העבודה של ML מחולק לשלבים נפרדים, עם סקריפט משלו שלוקח פרמטרים של קלט ופלט. בסעיף הבא, נדגיש קטעי קוד מפתח מכל שלב. את הקוד המלא ניתן למצוא ב- aws-samples-for-ray מאגר GitHub.

תנאים מוקדמים

כדי להשתמש ב- SageMaker Python SDK ולהפעיל את הקוד המשויך לפוסט זה, אתה צריך את התנאים המוקדמים הבאים:

הטמעת נתונים לתוך SageMaker Feature Store

השלב הראשון בזרימת העבודה של ML הוא לקרוא את קובץ נתוני המקור ממנו שירות אחסון פשוט של אמזון (Amazon S3) בפורמט CSV והטמע אותו לתוך SageMaker Feature Store. SageMaker Feature Store הוא מאגר ייעודי המקל על צוותים ליצור, לשתף ולנהל תכונות ML. זה מפשט את הגילוי, השימוש החוזר והשיתוף של תכונות, מה שמוביל לפיתוח מהיר יותר, הגברת שיתוף הפעולה בתוך צוותי לקוחות והפחתת עלויות.

הטמעת תכונות לתוך מאגר התכונות מכילה את השלבים הבאים:

  1. הגדר קבוצת תכונות וצור את קבוצת התכונות בחנות התכונות.
  2. הכן את נתוני המקור עבור מאגר התכונות על ידי הוספת זמן אירוע ומזהה רשומה עבור כל שורת נתונים.
  3. הכנס את הנתונים המוכנים לקבוצת התכונות על ידי שימוש ב-Boto3 SDK.

בחלק זה, אנו מדגישים רק את שלב 3, מכיוון שזהו החלק הכולל עיבוד מקביל של משימת הבליעה באמצעות Ray. אתה יכול לעיין בקוד המלא עבור תהליך זה ב- GitHub ריפו.

השמיים ingest_features מתודה מוגדרת בתוך מחלקה שנקראת Featurestore. שים לב כי Featurestore הכיתה מעוטרת ב @ray.remote. זה מצביע על כך שמופע של מחלקה זו הוא שחקן ריי, יחידת חישוב ממלכתית ובמקביל בתוך ריי. זהו מודל תכנות המאפשר לך ליצור אובייקטים מבוזרים השומרים על מצב פנימי וניתן לגשת אליהם במקביל על ידי מספר משימות הפועלות בצמתים שונים באשכול Ray. שחקנים מספקים דרך לנהל ולהטמיע את המצב הנתון לשינוי, מה שהופך אותם לבעלי ערך לבניית יישומים מורכבים ומצביים בסביבה מבוזרת. אתה יכול לציין דרישות משאבים גם אצל שחקנים. במקרה זה, כל מופע של ה FeatureStore המחלקה תדרוש 0.5 מעבדים. ראה את הקוד הבא:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

אתה יכול ליצור אינטראקציה עם השחקן על ידי התקשרות ל- remote מַפעִיל. בקוד הבא, מספר השחקנים הרצוי מועבר כארגומנט קלט לתסריט. הנתונים מחולקים לאחר מכן על סמך מספר השחקנים ומועברים לתהליכים המקבילים המרוחקים כדי שיוכלו לתוך מאגר התכונות. אתה יכול להתקשר get על האובייקט ref כדי לחסום את ביצוע המשימה הנוכחית עד להשלמת החישוב המרוחק והתוצאה תהיה זמינה. כשהתוצאה תהיה זמינה, ray.get יחזיר את התוצאה, וביצוע המשימה הנוכחית ימשיך.

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

הכן נתונים להדרכה, אימות ובדיקות

בשלב זה, אנו משתמשים ב-Ray Dataset כדי לפצל, לשנות ולהתאים ביעילות את מערך הנתונים שלנו כהכנה ללמידת מכונה. Ray Dataset מספק דרך סטנדרטית לטעינת נתונים מבוזרים לתוך Ray, התומכת במערכות אחסון ופורמטים שונים של קבצים. יש לו ממשקי API לפעולות עיבוד מקדים של נתוני ML כמו טרנספורמציות מקבילות, ערבוב, קיבוץ וצבירה. Ray Dataset מטפל גם בפעולות הדורשות הגדרה מתקנת והאצת GPU. זה משתלב בצורה חלקה עם ספריות עיבוד נתונים אחרות כמו Spark, Pandas, NumPy ועוד, כמו גם עם מסגרות ML כמו TensorFlow ו- PyTorch. זה מאפשר לבנות צינורות נתונים מקצה לקצה וזרימות עבודה של ML על גבי Ray. המטרה היא להקל על עיבוד נתונים מבוזר ו-ML עבור מתרגלים וחוקרים.

בואו נסתכל על קטעים מהסקריפטים שמבצעים עיבוד מקדים זה של הנתונים. אנו מתחילים בטעינת הנתונים מחנות התכונות:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

לאחר מכן אנו מפצלים ומדרגים נתונים באמצעות ההפשטות ברמה גבוהה יותר הזמינות מה- ray.data הספריה:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

מערכי הרכבת, האימות והבדיקה המעובדים מאוחסנים באמזון S3 ויעברו כפרמטרי הקלט לשלבים הבאים.

בצע אימון מודלים ואופטימיזציה של היפרפרמטרים

כשהנתונים שלנו מעובדים מראש ומוכנים למידול, הגיע הזמן לאמן כמה מודלים של ML ולכוונן עדין את הפרמטרים ההיפרמטרים שלהם כדי למקסם את הביצועים החזויים. אנו משתמשים XGBoost-Ray, backend מבוזר עבור XGBoost בנוי על Ray המאפשר אימון מודלים של XGBoost על מערכי נתונים גדולים על ידי שימוש במספר צמתים ו-GPUs. היא מספקת תחליפים פשוטים של מערכת ה-API של XGBoost וחזויה תוך טיפול במורכבות של ניהול נתונים מבוזרים והדרכה מתחת למכסה המנוע.

כדי לאפשר הפצה של האימון על מספר צמתים, אנו משתמשים בכיתה עוזרת בשם RayHelper. כפי שמוצג בקוד הבא, אנו משתמשים בתצורת המשאבים של עבודת ההדרכה ובוחרים את המארח הראשון כצומת הראש:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

אנו יכולים להשתמש במידע המארח כדי להחליט כיצד לאתחל את Ray בכל אחד ממופעי עבודת ההדרכה:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

כאשר מתחילה עבודת אימון, ניתן לאתחל אשכול Ray על ידי קריאה ל- start_ray() שיטה על מופע של RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

לאחר מכן אנו משתמשים במאמן XGBoost מבית XGBoost-Ray לאימון:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

שימו לב שבזמן יצירת מופע של trainer, אנחנו עוברים RayParams, שלוקח את מספר השחקנים ומספר המעבדים לכל שחקנים. XGBoost-Ray משתמש במידע זה כדי להפיץ את האימון על פני כל הצמתים המחוברים לאשכול ה-Ray.

כעת אנו יוצרים אובייקט אומדן XGBoost המבוסס על SageMaker Python SDK ומשתמשים בו עבור עבודת HPO.

תזמר את השלבים הקודמים באמצעות SageMaker Pipelines

כדי לבנות זרימת עבודה ML הניתנת להרחבה וניתנת לשימוש חוזר מקצה לקצה, עלינו להשתמש בכלי CI/CD כדי לתזמר את השלבים הקודמים לצינור. ל- SageMaker Pipelines יש אינטגרציה ישירה עם SageMaker, SageMaker Python SDK ו- SageMaker Studio. אינטגרציה זו מאפשרת לך ליצור זרימות עבודה של ML עם Python SDK קל לשימוש, ולאחר מכן להמחיש ולנהל את זרימת העבודה שלך באמצעות SageMaker Studio. אתה יכול גם לעקוב אחר ההיסטוריה של הנתונים שלך בתוך ביצוע הצינור ולקבוע שלבים לאחסון במטמון.

SageMaker Pipelines יוצר גרף אציקלי מכוון (DAG) הכולל שלבים הדרושים לבניית זרימת עבודה של ML. כל צינור הוא סדרה של שלבים מחוברים המתוזמרים על ידי תלות בנתונים בין שלבים, וניתן להגדיר אותו בפרמטרים, מה שמאפשר לך לספק משתני קלט כפרמטרים עבור כל ריצה של הצינור. ל- SageMaker Pipelines יש ארבעה סוגים של פרמטרים של צינור: ParameterString, ParameterInteger, ParameterFloat, ו ParameterBoolean. בסעיף זה, אנו מפרמטרים כמה ממשתני הקלט ומגדירים את תצורת מטמון השלבים:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

אנו מגדירים שני שלבי עיבוד: אחד להטמעת SageMaker Feature Store, השני להכנת נתונים. זה אמור להיראות דומה מאוד לשלבים הקודמים שתוארו קודם לכן. שורת הקוד החדשה היחידה היא ה ProcessingStep לאחר הגדרת השלבים, המאפשרת לנו לקחת את תצורת עבודת העיבוד ולכלול אותה כשלב צינור. אנו מציינים עוד את התלות של שלב הכנת הנתונים בשלב ההטמעה של SageMaker Feature Store. ראה את הקוד הבא:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

באופן דומה, כדי לבנות שלב אימון וכיוונון מודל, עלינו להוסיף הגדרה של TuningStep לאחר הקוד של שלב אימון המודל כדי לאפשר לנו להפעיל כוונון היפרפרמטרים של SageMaker כשלב בצנרת:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

לאחר שלב הכוונון, אנו בוחרים לרשום את הדגם הטוב ביותר ל- SageMaker Model Registry. כדי לשלוט באיכות המודל, אנו מיישמים שער איכות מינימלי שמשווה את המדד האובייקטיבי של המודל הטוב ביותר (RMSE) מול סף המוגדר כפרמטר הקלט של הצינור rmse_threshold. כדי לבצע הערכה זו, אנו יוצרים שלב עיבוד נוסף להפעלת תסריט הערכה. תוצאת הערכת המודל תישמר כקובץ נכסים. קבצי מאפיינים שימושיים במיוחד בעת ניתוח התוצאות של שלב עיבוד כדי להחליט כיצד יש להפעיל שלבים אחרים. ראה את הקוד הבא:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

אנו מגדירים א ModelStep כדי לרשום את הדגם הטוב ביותר ל- SageMaker Model Registry בצנרת שלנו. במקרה שהדגם הטוב ביותר לא יעבור את בדיקת האיכות שנקבעה מראש, אנו מציינים בנוסף א FailStep כדי להוציא הודעת שגיאה:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

לאחר מכן, אנו משתמשים ב-a ConditionStep כדי להעריך אם שלב רישום המודל או שלב הכישלון צריך להיעשות בשלב הבא בצנרת. במקרה שלנו, הדגם הטוב ביותר יירשם אם ציון ה-RMSE שלו נמוך מהסף.

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

לבסוף, אנו מתזמרים את כל השלבים המוגדרים לצינור:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

ניתן להמחיש את הצינור הקודם ולבצע ישירות ב-SageMaker Studio, או להתבצע על ידי קריאה execution = training_pipeline.start(). האיור הבא ממחיש את זרימת הצינור.

SageMaker Pipeline DAG

בנוסף, אנו יכולים לסקור את שושלת החפצים שנוצרו על ידי ביצוע הצינור.

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

פרוס את הדגם

לאחר שהדגם הטוב ביותר נרשם ברישום המודלים של SageMaker באמצעות ריצת צינור, אנו פורסים את המודל לנקודת קצה בזמן אמת באמצעות יכולות פריסת המודל המנוהלות במלואן של SageMaker. ל- SageMaker אפשרויות פריסת מודלים אחרות כדי לענות על הצרכים של מקרי שימוש שונים. לפרטים, עיין ב פרוס מודלים להסקת מסקנות בעת בחירת האפשרות הנכונה עבור מקרה השימוש שלך. ראשית, בואו נרשום את הדגם ב- SageMaker Model Registry:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

המצב הנוכחי של הדגם הוא PendingApproval. אנחנו צריכים להגדיר את הסטטוס שלו Approved לפני הפריסה:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

לנקות את

לאחר שתסיים להתנסות, זכור לנקות את המשאבים כדי למנוע חיובים מיותרים. כדי לנקות, מחק את נקודת הקצה, קבוצת המודלים, הצינור וקבוצת התכונות בזמן אמת על ידי קריאה לממשקי ה-API מחק נקודת קצה, מחקModelPackageGroup, מחק Pipeline, ו DeleteFeatureGroup, בהתאמה, וסגור את כל מופעי המחברת של SageMaker Studio.

סיכום

פוסט זה הדגים הדרכה שלב אחר שלב כיצד להשתמש ב- SageMaker Pipelines לתזמור זרימות עבודה מבוססות Ray. הדגמנו גם את היכולת של SageMaker Pipelines להשתלב עם כלי ML של צד שלישי. ישנם שירותי AWS שונים התומכים בעומסי עבודה של Ray בצורה מדרגית ומאובטחת כדי להבטיח ביצועים מצוינים ויעילות תפעולית. כעת, תורכם לחקור את היכולות החזקות הללו ולהתחיל לייעל את זרימות העבודה של למידת המכונה שלכם עם Amazon SageMaker Pipelines ו-Ray. בצע פעולה היום וגלה את מלוא הפוטנציאל של פרויקטי ה-ML שלך!


על המחבר

תזמר תהליכי עבודה מבוססי Ray באמצעות Amazon SageMaker | Amazon Web Services PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.ראג'ו רנגאן הוא ארכיטקט פתרונות בכיר בשירותי האינטרנט של אמזון (AWS). הוא עובד עם גופים בחסות ממשלתית, ועוזר להם לבנות פתרונות AI/ML באמצעות AWS. כשלא מתעסק עם פתרונות ענן, תתפסו אותו מבלה עם המשפחה או מרסק ציפורי משחק תוסס של בדמינטון עם חברים.

תזמר תהליכי עבודה מבוססי Ray באמצעות Amazon SageMaker | Amazon Web Services PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.שרי דינג הוא ארכיטקט בכיר בפתרונות AI/ML ב-Amazon Web Services (AWS). יש לה ניסיון רב בלמידת מכונה עם תואר דוקטור במדעי המחשב. היא עובדת בעיקר עם לקוחות מהמגזר הציבורי על אתגרים עסקיים שונים הקשורים ל-AI/ML, ועוזרת להם להאיץ את מסע למידת המכונה שלהם בענן AWS. כשהיא לא עוזרת ללקוחות, היא נהנית מפעילויות חוצות.

בול זמן:

עוד מ למידת מכונות AWS