با استفاده از Amazon SageMaker | گردش کار یادگیری ماشین مبتنی بر Ray را هماهنگ کنید خدمات وب آمازون

با استفاده از Amazon SageMaker | گردش کار یادگیری ماشین مبتنی بر Ray را هماهنگ کنید خدمات وب آمازون

یادگیری ماشین (ML) به طور فزاینده ای پیچیده می شود زیرا مشتریان سعی می کنند مشکلات چالش برانگیز بیشتری را حل کنند. این پیچیدگی اغلب منجر به نیاز به ML توزیع شده می شود که در آن چندین ماشین برای آموزش یک مدل واحد استفاده می شود. اگرچه این امر موازی سازی وظایف در چندین گره را ممکن می کند، که منجر به تسریع زمان آموزش، افزایش مقیاس پذیری و بهبود عملکرد می شود، چالش های مهمی در استفاده موثر از سخت افزار توزیع شده وجود دارد. دانشمندان داده باید به چالش هایی مانند پارتیشن بندی داده ها، تعادل بار، تحمل خطا و مقیاس پذیری رسیدگی کنند. مهندسان ML باید موازی‌سازی، زمان‌بندی، خطاها و تلاش‌های مجدد را به صورت دستی انجام دهند که به کد زیرساخت پیچیده نیاز دارد.

در این پست به مزایای استفاده از آن می پردازیم اشعه و آمازون SageMaker برای ML توزیع شده، و یک راهنمای گام به گام در مورد نحوه استفاده از این چارچوب ها برای ایجاد و استقرار یک گردش کار ML مقیاس پذیر ارائه کنید.

Ray، یک چارچوب محاسباتی توزیع‌شده منبع باز، چارچوبی انعطاف‌پذیر برای آموزش توزیع شده و ارائه مدل‌های ML فراهم می‌کند. جزئیات سیستم توزیع‌شده سطح پایین را از طریق کتابخانه‌های ساده و مقیاس‌پذیر برای کارهای رایج ML مانند پیش‌پردازش داده‌ها، آموزش توزیع‌شده، تنظیم فراپارامتر، یادگیری تقویتی و سرویس‌دهی مدل، جمع‌آوری می‌کند.

SageMaker یک سرویس کاملاً مدیریت شده برای ساخت، آموزش و استقرار مدل‌های ML است. Ray به‌طور یکپارچه با ویژگی‌های SageMaker ادغام می‌شود تا بارهای کاری پیچیده ML را ایجاد و به کار ببرد که هم کارآمد و هم قابل اعتماد هستند. ترکیب Ray و SageMaker قابلیت‌های سرتاسری را برای گردش‌های کاری مقیاس‌پذیر ML فراهم می‌کند و دارای ویژگی‌های برجسته زیر است:

  • بازیگران توزیع شده و ساختارهای موازی در Ray توسعه برنامه های کاربردی توزیع شده را ساده می کنند.
  • Ray AI Runtime (AIR) اصطکاک انتقال از توسعه به تولید را کاهش می دهد. با استفاده از Ray و AIR، همان کد پایتون می تواند به طور یکپارچه از یک لپ تاپ به یک خوشه بزرگ مقیاس شود.
  • زیرساخت مدیریت‌شده SageMaker و ویژگی‌هایی مانند کارهای پردازش، کارهای آموزشی، و کارهای تنظیم فراپارامتر می‌توانند از کتابخانه‌های Ray در زیر برای محاسبات توزیع‌شده استفاده کنند.
  • آزمایشات آمازون SageMaker امکان تکرار سریع و پیگیری آزمایشات را فراهم می کند.
  • فروشگاه ویژگی آمازون SageMaker یک مخزن مقیاس پذیر برای ذخیره، بازیابی و به اشتراک گذاری ویژگی های ML برای آموزش مدل فراهم می کند.
  • مدل های آموزش دیده را می توان ذخیره، نسخه بندی و ردیابی کرد رجیستری مدل آمازون SageMaker برای حکومت و مدیریت
  • خطوط لوله آمازون SageMaker اجازه می دهد تا چرخه عمر ML سرتاسر را از آماده سازی و آموزش داده ها تا مدل سازی استقرار به عنوان گردش کار خودکار تنظیم کند.

بررسی اجمالی راه حل

این پست بر روی مزایای استفاده از Ray و SageMaker با هم تمرکز دارد. ما یک گردش کار ML مبتنی بر Ray را با استفاده از SageMaker Pipelines تنظیم کردیم. گردش کار شامل ورود موازی داده ها به فروشگاه ویژگی با استفاده از بازیگران Ray، پیش پردازش داده با Ray Data، مدل های آموزشی و تنظیم فراپارامتر در مقیاس با استفاده از Ray Train و بهینه سازی هایپرپارامتر (HPO) و در نهایت ارزیابی مدل و ثبت مدل در یک رجیستری مدل

برای داده های خود استفاده می کنیم مجموعه داده مسکن مصنوعی که از هشت ویژگی تشکیل شده است (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCHو DECK) و مدل ما پیش بینی خواهد کرد PRICE از خانه

هر مرحله در گردش کار ML به مراحل گسسته تقسیم می شود، با اسکریپت خاص خود که پارامترهای ورودی و خروجی را می گیرد. در بخش بعدی، تکه کدهای کلیدی را از هر مرحله برجسته می کنیم. کد کامل را می توان در مخزن aws-samples-for-ray GitHub.

پیش نیازها

برای استفاده از SageMaker Python SDK و اجرای کد مرتبط با این پست، به پیش نیازهای زیر نیاز دارید:

داده ها را به فروشگاه ویژگی SageMaker وارد کنید

اولین مرحله در گردش کار ML خواندن فایل داده منبع از آن است سرویس ذخیره سازی ساده آمازون (Amazon S3) در قالب CSV و آن را در فروشگاه ویژگی SageMaker وارد کنید. SageMaker Feature Store یک مخزن هدفمند است که ایجاد، اشتراک گذاری و مدیریت ویژگی های ML را برای تیم ها آسان می کند. کشف، استفاده مجدد و به اشتراک گذاری ویژگی را ساده می کند و منجر به توسعه سریع تر، افزایش همکاری در تیم های مشتری و کاهش هزینه ها می شود.

وارد کردن ویژگی ها در فروشگاه ویژگی شامل مراحل زیر است:

  1. یک گروه ویژگی تعریف کنید و گروه ویژگی را در فروشگاه ویژگی ایجاد کنید.
  2. با افزودن زمان رویداد و شناسه رکورد برای هر ردیف داده، داده منبع را برای ذخیره ویژگی آماده کنید.
  3. داده های آماده شده را با استفاده از Boto3 SDK وارد گروه ویژگی کنید.

در این بخش، ما فقط مرحله 3 را برجسته می کنیم، زیرا این قسمتی است که شامل پردازش موازی کار بلع با استفاده از Ray است. شما می توانید کد کامل این فرآیند را در قسمت مشاهده کنید GitHub repo.

La ingest_features متد در داخل کلاسی به نام تعریف شده است Featurestore. توجه داشته باشید که Featurestore کلاس تزئین شده است @ray.remote. این نشان می دهد که نمونه ای از این کلاس یک بازیگر Ray، یک واحد محاسباتی حالت دار و همزمان در Ray است. این یک مدل برنامه نویسی است که به شما امکان می دهد اشیاء توزیع شده ای ایجاد کنید که یک حالت داخلی را حفظ می کنند و می توان به طور همزمان توسط چندین کار در حال اجرا بر روی گره های مختلف در یک خوشه Ray به آنها دسترسی داشت. بازیگران راهی برای مدیریت و محصور کردن حالت تغییرپذیر فراهم می‌کنند و آنها را برای ساخت برنامه‌های پیچیده و حالت‌دار در یک محیط توزیع شده ارزشمند می‌سازند. شما می توانید منابع مورد نیاز را در بازیگران نیز مشخص کنید. در این مورد، هر نمونه از FeatureStore کلاس به 0.5 CPU نیاز دارد. کد زیر را ببینید:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

شما می توانید از طریق تماس با بازیگر با remote اپراتور. در کد زیر تعداد بازیگر مورد نظر به عنوان آرگومان ورودی به اسکریپت ارسال می شود. سپس داده ها بر اساس تعداد بازیگران تقسیم می شوند و به فرآیندهای موازی راه دور منتقل می شوند تا در فروشگاه ویژگی ها وارد شوند. می توانید تماس بگیرید get روی شی ref برای جلوگیری از اجرای وظیفه فعلی تا زمانی که محاسبه از راه دور کامل شود و نتیجه در دسترس باشد. وقتی نتیجه در دسترس است، ray.get نتیجه را برمی گرداند و اجرای کار فعلی ادامه می یابد.

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

داده ها را برای آموزش، اعتبار سنجی و آزمایش آماده کنید

در این مرحله، ما از Ray Dataset برای تقسیم کارآمد، تبدیل و مقیاس‌بندی مجموعه داده‌های خود در آماده‌سازی برای یادگیری ماشین استفاده می‌کنیم. Ray Dataset یک راه استاندارد برای بارگذاری داده های توزیع شده در Ray ارائه می دهد که از سیستم های ذخیره سازی مختلف و فرمت های فایل پشتیبانی می کند. دارای APIهایی برای عملیات پیش‌پردازش داده‌های ML مانند تبدیل‌های موازی، به هم زدن، گروه‌بندی و تجمیع‌ها. Ray Dataset همچنین عملیاتی را انجام می دهد که نیاز به راه اندازی حالت و شتاب GPU دارند. این نرم افزار به راحتی با سایر کتابخانه های پردازش داده مانند Spark، Pandas، NumPy و غیره و همچنین چارچوب های ML مانند TensorFlow و PyTorch ادغام می شود. این اجازه می دهد تا خطوط لوله داده سرتاسر و گردش کار ML در بالای Ray ایجاد شود. هدف این است که پردازش داده های توزیع شده و ML را برای پزشکان و محققان آسان تر کند.

بیایید به بخش‌هایی از اسکریپت‌هایی که این پیش‌پردازش داده‌ها را انجام می‌دهند نگاه کنیم. ما با بارگیری داده ها از فروشگاه ویژگی شروع می کنیم:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

سپس داده ها را با استفاده از انتزاعات سطح بالاتری که در دسترس هستند، تقسیم و مقیاس می کنیم ray.data کتابخانه:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

مجموعه داده‌های قطار پردازش شده، اعتبارسنجی و آزمایش در آمازون S3 ذخیره می‌شوند و به عنوان پارامترهای ورودی به مراحل بعدی ارسال می‌شوند.

آموزش مدل و بهینه سازی هایپرپارامتر را انجام دهید

با داده‌های ما از پیش پردازش شده و آماده برای مدل‌سازی، زمان آموزش برخی از مدل‌های ML و تنظیم دقیق فراپارامترهای آن‌ها برای به حداکثر رساندن عملکرد پیش‌بینی فرا رسیده است. ما استفاده می کنیم XGBoost-Ray، یک باطن توزیع شده برای XGBoost که بر روی Ray ساخته شده است که آموزش مدل های XGBoost را روی مجموعه داده های بزرگ با استفاده از چندین گره و GPU امکان پذیر می کند. این برنامه جایگزین‌های ساده‌ای برای قطار XGBoost و پیش‌بینی APIها و در عین حال مدیریت پیچیدگی‌های مدیریت داده‌های توزیع‌شده و آموزش زیر پوشش ارائه می‌کند.

برای فعال کردن توزیع آموزش روی چندین گره، از یک کلاس کمکی به نام استفاده می کنیم RayHelper. همانطور که در کد زیر نشان داده شده است، ما از پیکربندی منبع شغل آموزشی استفاده می کنیم و اولین هاست را به عنوان گره اصلی انتخاب می کنیم:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

ما می توانیم از اطلاعات میزبان برای تصمیم گیری در مورد نحوه مقداردهی اولیه Ray در هر یک از نمونه های شغلی آموزشی استفاده کنیم:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

هنگامی که یک کار آموزشی شروع می شود، یک خوشه Ray را می توان با فراخوانی مقداردهی اولیه کرد start_ray() روش در یک نمونه از RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

سپس از ترینر XGBoost از XGBoost-Ray برای آموزش استفاده می کنیم:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

توجه داشته باشید که هنگام نمونه سازی trainer، عبور می کنیم RayParams، که تعداد بازیگران و تعداد CPU در هر بازیگر را می گیرد. XGBoost-Ray از این اطلاعات برای توزیع آموزش در تمام گره های متصل به خوشه Ray استفاده می کند.

اکنون یک شی تخمین‌گر XGBoost بر اساس SageMaker Python SDK ایجاد می‌کنیم و از آن برای کار HPO استفاده می‌کنیم.

مراحل قبل را با استفاده از SageMaker Pipelines هماهنگ کنید

برای ایجاد یک گردش کار ML مقیاس پذیر و قابل استفاده مجدد از انتها به انتها، باید از یک ابزار CI/CD برای هماهنگ کردن مراحل قبلی در خط لوله استفاده کنیم. SageMaker Pipelines دارای ادغام مستقیم با SageMaker، SageMaker Python SDK و SageMaker Studio است. این ادغام به شما این امکان را می‌دهد که گردش‌های کاری ML را با استفاده آسان Python SDK ایجاد کنید و سپس با استفاده از SageMaker Studio گردش کار خود را تجسم و مدیریت کنید. همچنین می توانید تاریخچه داده های خود را در اجرای خط لوله ردیابی کنید و مراحل ذخیره سازی را تعیین کنید.

SageMaker Pipelines یک گراف غیر چرخه جهت دار (DAG) ایجاد می کند که شامل مراحل مورد نیاز برای ایجاد یک گردش کار ML است. هر خط لوله مجموعه ای از مراحل به هم پیوسته است که توسط وابستگی های داده بین مراحل هماهنگ شده است و می تواند پارامتری شود و به شما امکان می دهد متغیرهای ورودی را به عنوان پارامتر برای هر اجرای خط لوله ارائه دهید. SageMaker Pipelines دارای چهار نوع پارامتر خط لوله است: ParameterString, ParameterInteger, ParameterFloatو ParameterBoolean. در این بخش، برخی از متغیرهای ورودی را پارامتر می کنیم و پیکربندی گام کش را تنظیم می کنیم:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

ما دو مرحله پردازش را تعریف می کنیم: یکی برای SageMaker Feature Store و دیگری برای آماده سازی داده ها. این باید بسیار شبیه به مراحل قبلی باشد که قبلا توضیح داده شد. تنها خط جدید کد این است ProcessingStep پس از تعریف مراحل، که به ما امکان می دهد پیکربندی کار پردازش را انجام دهیم و آن را به عنوان یک مرحله خط لوله قرار دهیم. ما بیشتر وابستگی مرحله آماده سازی داده ها را به مرحله جذب فروشگاه ویژگی SageMaker مشخص می کنیم. کد زیر را ببینید:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

به طور مشابه، برای ساخت یک گام آموزش و تنظیم مدل، باید تعریفی از آن اضافه کنیم TuningStep پس از کد مرحله آموزش مدل به ما اجازه می دهد تا تنظیم هایپرپارامتر SageMaker را به عنوان یک مرحله در خط لوله اجرا کنیم:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

پس از مرحله تنظیم، ما انتخاب می کنیم که بهترین مدل را در SageMaker Model Registry ثبت کنیم. برای کنترل کیفیت مدل، یک دروازه حداقل کیفیت را پیاده سازی می کنیم که متریک هدف بهترین مدل (RMSE) را با آستانه تعریف شده به عنوان پارامتر ورودی خط لوله مقایسه می کند. rmse_threshold. برای انجام این ارزیابی، مرحله پردازش دیگری را برای اجرای an ایجاد می کنیم اسکریپت ارزیابی. نتیجه ارزیابی مدل به عنوان یک فایل ویژگی ذخیره می شود. فایل های Property به ویژه هنگام تجزیه و تحلیل نتایج یک مرحله پردازش برای تصمیم گیری در مورد نحوه اجرای سایر مراحل مفید هستند. کد زیر را ببینید:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

ما تعریف a ModelStep برای ثبت بهترین مدل در SageMaker Model Registry در خط لوله ما. در صورتی که بهترین مدل از بررسی کیفیت از پیش تعیین شده ما عبور نکند، ما همچنین a را مشخص می کنیم FailStep برای خروجی یک پیام خطا:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

بعد از a استفاده می کنیم ConditionStep برای ارزیابی اینکه آیا مرحله ثبت مدل یا مرحله شکست باید در مرحله بعدی انجام شود. در مورد ما، بهترین مدل در صورتی ثبت می شود که امتیاز RMSE آن کمتر از آستانه باشد.

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

در نهایت، ما تمام مراحل تعریف شده را در یک خط لوله هماهنگ می کنیم:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

خط لوله قبلی را می توان مستقیماً در SageMaker Studio تجسم و اجرا کرد یا با فراخوانی اجرا شد. execution = training_pipeline.start(). شکل زیر جریان خط لوله را نشان می دهد.

SageMaker Pipeline DAG

علاوه بر این، ما می توانیم دودمان مصنوعات تولید شده توسط اجرای خط لوله را بررسی کنیم.

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

مدل را مستقر کنید

پس از اینکه بهترین مدل در SageMaker Model Registry از طریق اجرای خط لوله ثبت شد، مدل را با استفاده از قابلیت های استقرار مدل کاملاً مدیریت شده SageMaker در یک نقطه پایانی بلادرنگ مستقر می کنیم. SageMaker گزینه های دیگری برای استقرار مدل برای رفع نیازهای موارد استفاده مختلف دارد. برای جزئیات به استقرار مدل ها برای استنتاج هنگام انتخاب گزینه مناسب برای مورد استفاده خود. ابتدا بیایید مدل ثبت شده در SageMaker Model Registry را دریافت کنیم:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

وضعیت فعلی مدل است PendingApproval. باید وضعیت آن را روی آن تنظیم کنیم Approved قبل از استقرار:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

پاک کردن

پس از اتمام آزمایش، به خاطر داشته باشید که منابع را تمیز کنید تا از هزینه های غیر ضروری جلوگیری کنید. برای پاکسازی، نقطه پایانی بلادرنگ، گروه مدل، خط لوله و گروه ویژگی را با فراخوانی APIها حذف کنید. DeleteEndpoint, DeleteModelPackageGroup, DeletePipelineو DeleteFeatureGroupبه ترتیب، و تمام نمونه های نوت بوک SageMaker Studio را خاموش کنید.

نتیجه

این پست یک راهنما گام به گام در مورد نحوه استفاده از SageMaker Pipelines برای هماهنگ کردن جریان های کاری ML مبتنی بر Ray را نشان می دهد. ما همچنین قابلیت SageMaker Pipelines را برای ادغام با ابزارهای ML شخص ثالث نشان دادیم. سرویس‌های AWS مختلفی وجود دارند که از بارهای کاری Ray به شکلی مقیاس‌پذیر و ایمن پشتیبانی می‌کنند تا از عملکرد عالی و کارایی عملیاتی اطمینان حاصل کنند. اکنون، نوبت شماست که این قابلیت‌های قدرتمند را کشف کنید و با Amazon SageMaker Pipelines و Ray بهینه‌سازی گردش‌های کاری یادگیری ماشینی خود را آغاز کنید. امروز اقدام کنید و پتانسیل کامل پروژه های ML خود را باز کنید!


درباره نویسنده

با استفاده از Amazon SageMaker | گردش کار یادگیری ماشین مبتنی بر Ray را هماهنگ کنید خدمات وب آمازون هوش داده پلاتو بلاک چین. جستجوی عمودی Ai.راجو رنگان یک معمار ارشد راه حل در خدمات وب آمازون (AWS) است. او با نهادهای تحت حمایت دولت کار می کند و به آنها کمک می کند تا راه حل های AI/ML را با استفاده از AWS بسازند. وقتی با راه‌حل‌های ابری سر و کله نمی‌زنید، او را در حال معاشرت با خانواده یا له کردن مرغ‌ها در یک بازی پر جنب و جوش بدمینتون با دوستان خواهید گرفت.

با استفاده از Amazon SageMaker | گردش کار یادگیری ماشین مبتنی بر Ray را هماهنگ کنید خدمات وب آمازون هوش داده پلاتو بلاک چین. جستجوی عمودی Ai.شری دینگ یک معمار ارشد راه حل های متخصص AI/ML در خدمات وب آمازون (AWS) است. او تجربه گسترده ای در یادگیری ماشین با مدرک دکترا در علوم کامپیوتر دارد. او عمدتاً با مشتریان بخش عمومی در چالش‌های مختلف تجاری مرتبط با هوش مصنوعی کار می‌کند و به آنها کمک می‌کند سفر یادگیری ماشینی خود را در AWS Cloud تسریع کنند. وقتی به مشتریان کمک نمی کند، از فعالیت های بیرون از خانه لذت می برد.

تمبر زمان:

بیشتر از آموزش ماشین AWS