قم بتنسيق سير عمل التعلم الآلي المستند إلى Ray باستخدام Amazon SageMaker | خدمات الويب الأمازون

قم بتنسيق سير عمل التعلم الآلي المستند إلى Ray باستخدام Amazon SageMaker | خدمات الويب الأمازون

أصبح التعلم الآلي (ML) معقدًا بشكل متزايد حيث يحاول العملاء حل المشكلات الصعبة بشكل متزايد. غالبًا ما يؤدي هذا التعقيد إلى الحاجة إلى التعلم الآلي الموزع، حيث يتم استخدام أجهزة متعددة لتدريب نموذج واحد. على الرغم من أن هذا يتيح توازي المهام عبر عقد متعددة، مما يؤدي إلى تسريع أوقات التدريب، وتعزيز قابلية التوسع، وتحسين الأداء، إلا أن هناك تحديات كبيرة في استخدام الأجهزة الموزعة بشكل فعال. يتعين على علماء البيانات مواجهة تحديات مثل تقسيم البيانات، وموازنة التحميل، والتسامح مع الأخطاء، وقابلية التوسع. يجب على مهندسي ML التعامل مع الموازاة والجدولة والأخطاء وإعادة المحاولة يدويًا، مما يتطلب تعليمات برمجية معقدة للبنية التحتية.

في هذه التدوينة نناقش فوائد الاستخدام شعاع و الأمازون SageMaker لتعلم الآلة الموزع، وتوفير دليل خطوة بخطوة حول كيفية استخدام أطر العمل هذه لإنشاء سير عمل تعلم الآلة القابل للتطوير ونشره.

يوفر Ray، وهو إطار عمل حوسبة موزع مفتوح المصدر، إطارًا مرنًا للتدريب الموزع وخدمة نماذج تعلم الآلة. إنه يستخلص تفاصيل النظام الموزع ذات المستوى المنخفض من خلال مكتبات بسيطة وقابلة للتطوير لمهام تعلم الآلة الشائعة مثل المعالجة المسبقة للبيانات، والتدريب الموزع، وضبط المعلمات الفائقة، والتعلم المعزز، وخدمة النماذج.

SageMaker هي خدمة مُدارة بالكامل لبناء نماذج تعلم الآلة والتدريب عليها ونشرها. يتكامل Ray بسلاسة مع ميزات SageMaker لإنشاء ونشر أعباء عمل تعلم الآلة المعقدة التي تتميز بالكفاءة والموثوقية. يوفر الجمع بين Ray وSageMaker إمكانات شاملة لسير عمل تعلم الآلة القابل للتطوير، ويحتوي على الميزات المميزة التالية:

  • تعمل الجهات الفاعلة الموزعة والبنيات المتوازية في Ray على تبسيط تطوير التطبيقات الموزعة.
  • يعمل Ray AI Runtime (AIR) على تقليل الاحتكاك الناتج عن الانتقال من التطوير إلى الإنتاج. باستخدام Ray وAIR، يمكن أن يتغير كود Python نفسه بسلاسة من جهاز كمبيوتر محمول إلى مجموعة كبيرة.
  • يمكن للبنية الأساسية المُدارة لـ SageMaker وميزات مثل مهام المعالجة، ووظائف التدريب، ووظائف ضبط المعلمات الفائقة استخدام مكتبات Ray الموجودة أسفلها للحوسبة الموزعة.
  • تجارب Amazon SageMaker يسمح بالتكرار السريع وتتبع التجارب.
  • متجر ميزات Amazon SageMaker يوفر مستودعًا قابلاً للتطوير لتخزين ميزات تعلم الآلة واسترجاعها ومشاركتها للتدريب النموذجي.
  • يمكن تخزين النماذج المدربة وإصدارها وتتبعها سجل نموذج Amazon SageMaker للحكم والإدارة.
  • خطوط أنابيب Amazon SageMaker يسمح بتنظيم دورة حياة تعلم الآلة من البداية إلى النهاية بدءًا من إعداد البيانات والتدريب وحتى نشر النموذج كمسارات عمل آلية.

حل نظرة عامة

يركز هذا المنشور على فوائد استخدام Ray وSageMaker معًا. قمنا بإعداد سير عمل التعلم الآلي القائم على راي من البداية إلى النهاية، والذي تم تنسيقه باستخدام خطوط أنابيب SageMaker. يتضمن سير العمل استيعابًا متوازيًا للبيانات في مخزن الميزات باستخدام ممثلي Ray، والمعالجة المسبقة للبيانات باستخدام Ray Data، ونماذج التدريب وضبط المعلمات الفائقة على نطاق واسع باستخدام Ray Train ووظائف ضبط تحسين المعلمات الفائقة (HPO)، وأخيرًا تقييم النموذج وتسجيل النموذج في التسجيل النموذجي.

بالنسبة لبياناتنا، نستخدم مجموعة بيانات الإسكان الاصطناعية الذي يتكون من ثماني ميزات (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCHو DECK) وسيتنبأ نموذجنا بـ PRICE من المنزل.

يتم تقسيم كل مرحلة في سير عمل تعلم الآلة إلى خطوات منفصلة، ​​مع البرنامج النصي الخاص بها الذي يأخذ معلمات الإدخال والإخراج. في القسم التالي، نسلط الضوء على مقتطفات التعليمات البرمجية الرئيسية من كل خطوة. يمكن العثور على الكود الكامل على مستودع aws-samples-for-ray GitHub.

المتطلبات الأساسية المسبقة

لاستخدام SageMaker Python SDK وتشغيل التعليمات البرمجية المرتبطة بهذا المنشور، تحتاج إلى المتطلبات الأساسية التالية:

استوعب البيانات في متجر ميزات SageMaker

الخطوة الأولى في سير عمل ML هي قراءة ملف البيانات المصدر منه خدمة تخزين أمازون البسيطة (Amazon S3) بتنسيق CSV واستيعابه في SageMaker Features Store. يعد SageMaker Features Store مستودعًا مصممًا لهذا الغرض، وهو ما يسهل على الفرق إنشاء ميزات تعلم الآلة ومشاركتها وإدارتها. فهو يبسط اكتشاف الميزات وإعادة استخدامها ومشاركتها، مما يؤدي إلى تطوير أسرع وزيادة التعاون بين فرق العملاء وخفض التكاليف.

يتضمن استيعاب الميزات في متجر الميزات الخطوات التالية:

  1. حدد مجموعة ميزات وقم بإنشاء مجموعة الميزات في متجر الميزات.
  2. قم بإعداد البيانات المصدر لمخزن الميزات عن طريق إضافة وقت الحدث ومعرف السجل لكل صف من البيانات.
  3. قم بإدراج البيانات المعدة في مجموعة الميزات باستخدام Boto3 SDK.

في هذا القسم، نسلط الضوء على الخطوة 3 فقط، لأن هذا هو الجزء الذي يتضمن المعالجة المتوازية لمهمة العرض باستخدام Ray. يمكنك مراجعة الكود الكامل لهذه العملية في جيثب ريبو.

ingest_features يتم تعريف الطريقة داخل فئة تسمى Featurestore. نلاحظ أن Featurestore تم تزيين الفصل ب @ray.remote. يشير هذا إلى أن مثيل هذه الفئة هو ممثل راي، وهو وحدة حسابية ذات حالة ومتزامنة داخل راي. إنه نموذج برمجة يسمح لك بإنشاء كائنات موزعة تحافظ على حالة داخلية ويمكن الوصول إليها بشكل متزامن من خلال مهام متعددة تعمل على عقد مختلفة في مجموعة Ray. توفر الجهات الفاعلة طريقة لإدارة وتغليف الحالة القابلة للتغيير، مما يجعلها ذات قيمة لبناء تطبيقات معقدة وذات حالة في بيئة موزعة. يمكنك تحديد متطلبات الموارد في الجهات الفاعلة أيضًا. في هذه الحالة، كل حالة من FeatureStore سوف تتطلب الفئة 0.5 وحدة المعالجة المركزية. انظر الكود التالي:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

يمكنك التفاعل مع الممثل عن طريق الاتصال بالرقم remote المشغل أو العامل. في التعليمة البرمجية التالية، يتم تمرير العدد المطلوب من الممثلين كوسيطة إدخال للبرنامج النصي. يتم بعد ذلك تقسيم البيانات بناءً على عدد الجهات الفاعلة وتمريرها إلى العمليات المتوازية عن بعد ليتم استيعابها في مخزن الميزات. تستطيع الاتصال get على مرجع الكائن لمنع تنفيذ المهمة الحالية حتى اكتمال الحساب عن بعد وتكون النتيجة متاحة. عندما تكون النتيجة متاحة، ray.get سيُرجع النتيجة، وسيستمر تنفيذ المهمة الحالية.

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

إعداد البيانات للتدريب والتحقق من الصحة والاختبار

في هذه الخطوة، نستخدم Ray Dataset لتقسيم مجموعة البيانات لدينا وتحويلها وتوسيع نطاقها بكفاءة استعدادًا للتعلم الآلي. توفر Ray Dataset طريقة قياسية لتحميل البيانات الموزعة إلى Ray، مما يدعم أنظمة التخزين وتنسيقات الملفات المختلفة. يحتوي على واجهات برمجة التطبيقات لعمليات المعالجة المسبقة لبيانات ML الشائعة مثل التحويلات المتوازية والخلط والتجميع والتجميع. تتعامل Ray Dataset أيضًا مع العمليات التي تحتاج إلى إعداد محدد وتسريع GPU. فهو يتكامل بسلاسة مع مكتبات معالجة البيانات الأخرى مثل Spark وPandas وNumPy والمزيد، بالإضافة إلى أطر عمل ML مثل TensorFlow وPyTorch. يتيح ذلك إنشاء خطوط أنابيب بيانات شاملة وسير عمل تعلم الآلة أعلى Ray. الهدف هو جعل معالجة البيانات الموزعة وتعلم الآلة أسهل للممارسين والباحثين.

دعونا نلقي نظرة على أقسام البرامج النصية التي تقوم بالمعالجة المسبقة للبيانات. نبدأ بتحميل البيانات من متجر الميزات:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

نقوم بعد ذلك بتقسيم البيانات وقياسها باستخدام التجريدات ذات المستوى الأعلى المتوفرة من ray.data مكتبة:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

يتم تخزين مجموعات بيانات التدريب والتحقق والاختبار التي تمت معالجتها في Amazon S3 وسيتم تمريرها كمعلمات إدخال إلى الخطوات اللاحقة.

إجراء تدريب على النموذج وتحسين المعلمات الفائقة

بعد أن تمت معالجة بياناتنا مسبقًا وجاهزة للنمذجة، فقد حان الوقت لتدريب بعض نماذج تعلم الآلة وضبط معلماتها الفائقة لتحقيق أقصى قدر من الأداء التنبؤي. نحن نستخدم XGBoost- راي، وهي واجهة خلفية موزعة لـ XGBoost مبنية على Ray والتي تتيح تدريب نماذج XGBoost على مجموعات البيانات الكبيرة باستخدام عقد متعددة ووحدات معالجة الرسومات. فهو يوفر بدائل بسيطة لتدريب XGBoost والتنبؤ بواجهات برمجة التطبيقات أثناء التعامل مع تعقيدات إدارة البيانات الموزعة والتدريب تحت الغطاء.

لتمكين توزيع التدريب على عقد متعددة، نستخدم فئة مساعدة تسمى RayHelper. كما هو موضح في الكود التالي، نستخدم تكوين الموارد لمهمة التدريب ونختار المضيف الأول ليكون العقدة الرئيسية:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

يمكننا استخدام معلومات المضيف لتحديد كيفية تهيئة Ray في كل حالة من حالات مهمة التدريب:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

عند بدء مهمة التدريب، يمكن تهيئة مجموعة Ray عن طريق استدعاء start_ray() طريقة على سبيل المثال RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

نستخدم بعد ذلك مدرب XGBoost من XGBoost-Ray للتدريب:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

لاحظ أنه أثناء إنشاء مثيل لـ trainer، لقد نجحنا RayParams، والذي يأخذ عدد الجهات الفاعلة وعدد وحدات المعالجة المركزية لكل جهة فاعلة. يستخدم XGBoost-Ray هذه المعلومات لتوزيع التدريب عبر جميع العقد المرتبطة بمجموعة Ray.

نقوم الآن بإنشاء كائن تقدير XGBoost استنادًا إلى SageMaker Python SDK واستخدامه في مهمة HPO.

قم بتنسيق الخطوات السابقة باستخدام SageMaker Pipelines

لإنشاء سير عمل تعلم الآلة قابل للتطوير وقابل لإعادة الاستخدام من طرف إلى طرف، نحتاج إلى استخدام أداة CI/CD لتنسيق الخطوات السابقة في المسار. تتمتع SageMaker Pipelines بتكامل مباشر مع SageMaker، وSageMaker Python SDK، وSageMaker Studio. يتيح لك هذا التكامل إنشاء مسارات عمل تعلم الآلة باستخدام Python SDK سهل الاستخدام، ثم تصور سير العمل وإدارته باستخدام SageMaker Studio. يمكنك أيضًا تتبع سجل بياناتك ضمن تنفيذ المسار وتعيين خطوات للتخزين المؤقت.

تقوم خطوط أنابيب SageMaker بإنشاء رسم بياني غير دوري موجه (DAG) يتضمن الخطوات اللازمة لإنشاء سير عمل تعلم الآلة. كل خط أنابيب عبارة عن سلسلة من الخطوات المترابطة التي يتم تنظيمها بواسطة تبعيات البيانات بين الخطوات، ويمكن تحديد معلماتها، مما يسمح لك بتوفير متغيرات الإدخال كمعلمات لكل تشغيل لخط الأنابيب. تحتوي خطوط أنابيب SageMaker على أربعة أنواع من معلمات خطوط الأنابيب: ParameterString, ParameterInteger, ParameterFloatو ParameterBoolean. في هذا القسم، نقوم بتحديد معلمات بعض متغيرات الإدخال وإعداد تكوين التخزين المؤقت للخطوة:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

نحدد خطوتين للمعالجة: واحدة لاستيعاب SageMaker Features Store، والأخرى لإعداد البيانات. يجب أن يبدو هذا مشابهًا جدًا للخطوات السابقة الموضحة مسبقًا. السطر الجديد الوحيد من التعليمات البرمجية هو ProcessingStep بعد تعريف الخطوات، مما يسمح لنا بأخذ تكوين مهمة المعالجة وإدراجها كخطوة مسار. نحدد أيضًا تبعية خطوة إعداد البيانات في خطوة استيعاب SageMaker Features Store. انظر الكود التالي:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

وبالمثل، لبناء خطوة تدريب وضبط نموذجية، نحتاج إلى إضافة تعريف لـ TuningStep بعد التعليمات البرمجية لخطوة تدريب النموذج للسماح لنا بتشغيل ضبط المعلمة الفائقة لـ SageMaker كخطوة في المسار:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

بعد خطوة الضبط، نختار تسجيل أفضل نموذج في SageMaker Model Registry. للتحكم في جودة النموذج، نقوم بتطبيق الحد الأدنى من بوابة الجودة التي تقارن المقياس الموضوعي لأفضل نموذج (RMSE) مع الحد المحدد كمعلمة إدخال خط الأنابيب rmse_threshold. لإجراء هذا التقييم، نقوم بإنشاء خطوة معالجة أخرى لتشغيل ملف السيناريو التقييم. سيتم تخزين نتيجة تقييم النموذج كملف خاصية. تعتبر ملفات الخصائص مفيدة بشكل خاص عند تحليل نتائج خطوة المعالجة لتحديد كيفية تشغيل الخطوات الأخرى. انظر الكود التالي:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

نحدد أ ModelStep لتسجيل أفضل نموذج في SageMaker Model Registry في خط أنابيبنا. في حالة عدم اجتياز النموذج الأفضل لفحص الجودة المحدد مسبقًا، فإننا نحدد بالإضافة إلى ذلك أ FailStep لإخراج رسالة خطأ:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

بعد ذلك ، نستخدم ملف ConditionStep لتقييم ما إذا كان ينبغي اتخاذ خطوة تسجيل النموذج أو خطوة الفشل بعد ذلك في المسار. في حالتنا، سيتم تسجيل أفضل نموذج إذا كانت درجة RMSE الخاصة به أقل من العتبة.

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

أخيرًا، نقوم بتنسيق جميع الخطوات المحددة في مسار:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

يمكن تصور المسار السابق وتنفيذه مباشرة في SageMaker Studio، أو يمكن تنفيذه عن طريق الاتصال execution = training_pipeline.start(). ويوضح الشكل التالي تدفق خط الأنابيب.

خط أنابيب SageMaker DAG

بالإضافة إلى ذلك، يمكننا مراجعة نسب القطع الأثرية الناتجة عن تنفيذ خط الأنابيب.

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

انشر النموذج

بعد تسجيل أفضل نموذج في SageMaker Model Registry عبر تشغيل المسار، نقوم بنشر النموذج إلى نقطة نهاية في الوقت الفعلي باستخدام إمكانات نشر النموذج المُدارة بالكامل لـ SageMaker. لدى SageMaker خيارات نشر نماذج أخرى لتلبية احتياجات حالات الاستخدام المختلفة. لمزيد من التفاصيل، راجع نشر نماذج للاستدلال عند اختيار الخيار المناسب لحالة الاستخدام الخاصة بك. أولاً، دعونا نسجل النموذج في SageMaker Model Registry:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

الوضع الحالي للنموذج هو PendingApproval. نحن بحاجة إلى ضبط حالته على Approved قبل النشر:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

تنظيف

بعد الانتهاء من التجربة، تذكر تنظيف الموارد لتجنب الرسوم غير الضرورية. للتنظيف، احذف نقطة النهاية في الوقت الفعلي ومجموعة النماذج وخطوط الأنابيب ومجموعة الميزات عن طريق استدعاء واجهات برمجة التطبيقات حذفنقطة النهاية, حذفModelPackageGroup, حذف خط الأنابيبو حذف مجموعة الميزات، على التوالي، وأغلق جميع مثيلات دفتر ملاحظات SageMaker Studio.

وفي الختام

يوضح هذا المنشور إرشادات خطوة بخطوة حول كيفية استخدام SageMaker Pipelines لتنظيم سير عمل تعلم الآلة المستند إلى Ray. لقد أظهرنا أيضًا قدرة SageMaker Pipelines على التكامل مع أدوات التعلم الآلي التابعة لجهات خارجية. هناك العديد من خدمات AWS التي تدعم أعباء عمل Ray بطريقة قابلة للتطوير وآمنة لضمان التميز في الأداء والكفاءة التشغيلية. الآن، حان دورك لاستكشاف هذه الإمكانات القوية والبدء في تحسين سير عمل تعلم الآلة لديك باستخدام Amazon SageMaker Pipelines وRay. اتخذ إجراءً اليوم واطلق العنان للإمكانات الكاملة لمشاريع تعلم الآلة الخاصة بك!


عن المؤلف

قم بتنسيق سير عمل التعلم الآلي المستند إلى Ray باستخدام Amazon SageMaker | أمازون ويب سيرفيسز PlatoBlockchain Data Intelligence. البحث العمودي. منظمة العفو الدولية.راجو رانجان هو مهندس حلول أول في Amazon Web Services (AWS). وهو يعمل مع الكيانات التي ترعاها الحكومة، ويساعدهم في بناء حلول الذكاء الاصطناعي/تعلم الآلة باستخدام AWS. عندما لا تعبث بالحلول السحابية، ستلاحظه وهو يتسكع مع العائلة أو يحطم الطيور في لعبة كرة الريشة المفعمة بالحيوية مع الأصدقاء.

قم بتنسيق سير عمل التعلم الآلي المستند إلى Ray باستخدام Amazon SageMaker | أمازون ويب سيرفيسز PlatoBlockchain Data Intelligence. البحث العمودي. منظمة العفو الدولية.شيري دينج هو أحد كبار مهندسي الحلول المتخصصة في الذكاء الاصطناعي/تعلم الآلة في Amazon Web Services (AWS). تتمتع بخبرة واسعة في التعلم الآلي وحصلت على درجة الدكتوراه في علوم الكمبيوتر. وهي تعمل بشكل أساسي مع عملاء القطاع العام على مختلف تحديات الأعمال المتعلقة بالذكاء الاصطناعي/التعلم الآلي، مما يساعدهم على تسريع رحلة التعلم الآلي الخاصة بهم على سحابة AWS. عندما لا تساعد العملاء، فإنها تستمتع بالأنشطة الخارجية.

الطابع الزمني:

اكثر من التعلم الآلي من AWS