จัดระเบียบเวิร์กโฟลว์แมชชีนเลิร์นนิงที่ใช้ Ray โดยใช้ Amazon SageMaker | อเมซอนเว็บเซอร์วิส

จัดระเบียบเวิร์กโฟลว์แมชชีนเลิร์นนิงที่ใช้ Ray โดยใช้ Amazon SageMaker | อเมซอนเว็บเซอร์วิส

การเรียนรู้ของเครื่อง (ML) มีความซับซ้อนมากขึ้นเรื่อยๆ เนื่องจากลูกค้าพยายามแก้ไขปัญหาที่ท้าทายมากขึ้นเรื่อยๆ ความซับซ้อนนี้มักจะนำไปสู่ความต้องการ ML แบบกระจาย โดยมีการใช้เครื่องจักรหลายเครื่องในการฝึกโมเดลเดียว แม้ว่าสิ่งนี้จะทำให้งานต่างๆ ขนานกันในหลายโหนด ซึ่งนำไปสู่เวลาการฝึกอบรมที่เร็วขึ้น ความสามารถในการปรับขนาดที่เพิ่มขึ้น และประสิทธิภาพที่ดีขึ้น แต่ก็มีความท้าทายที่สำคัญในการใช้ฮาร์ดแวร์แบบกระจายอย่างมีประสิทธิภาพ นักวิทยาศาสตร์ข้อมูลต้องจัดการกับความท้าทายต่างๆ เช่น การแบ่งพาร์ติชันข้อมูล การปรับสมดุลโหลด ความทนทานต่อข้อผิดพลาด และความสามารถในการปรับขนาด วิศวกร ML ต้องจัดการการทำงานแบบขนาน การกำหนดเวลา ข้อบกพร่อง และลองใหม่ด้วยตนเอง ซึ่งต้องใช้โค้ดโครงสร้างพื้นฐานที่ซับซ้อน

ในโพสต์นี้ เราจะพูดถึงประโยชน์ของการใช้ รังสี และ อเมซอน SageMaker สำหรับ ML แบบกระจาย และให้คำแนะนำทีละขั้นตอนเกี่ยวกับวิธีใช้เฟรมเวิร์กเหล่านี้เพื่อสร้างและปรับใช้เวิร์กโฟลว์ ML ที่ปรับขนาดได้

Ray ซึ่งเป็นเฟรมเวิร์กการประมวลผลแบบกระจายแบบโอเพ่นซอร์ส มอบเฟรมเวิร์กที่ยืดหยุ่นสำหรับการฝึกอบรมแบบกระจายและการให้บริการโมเดล ML โดยสรุปรายละเอียดระบบแบบกระจายระดับต่ำผ่านไลบรารีที่เรียบง่ายและปรับขนาดได้สำหรับงาน ML ทั่วไป เช่น การประมวลผลข้อมูลล่วงหน้า การฝึกอบรมแบบกระจาย การปรับแต่งไฮเปอร์พารามิเตอร์ การเรียนรู้แบบเสริมกำลัง และการให้บริการโมเดล

SageMaker เป็นบริการที่มีการจัดการเต็มรูปแบบสำหรับการสร้าง การฝึกอบรม และการปรับใช้โมเดล ML Ray ผสานรวมกับคุณสมบัติ SageMaker ได้อย่างราบรื่นเพื่อสร้างและปรับใช้ปริมาณงาน ML ที่ซับซ้อนซึ่งทั้งมีประสิทธิภาพและเชื่อถือได้ การรวมกันของ Ray และ SageMaker มอบความสามารถแบบ end-to-end สำหรับเวิร์กโฟลว์ ML ที่ปรับขนาดได้ และมีฟีเจอร์เด่นดังต่อไปนี้:

  • ตัวแสดงแบบกระจายและโครงสร้างความเท่าเทียมใน Ray ช่วยให้การพัฒนาแอปพลิเคชันแบบกระจายง่ายขึ้น
  • Ray AI Runtime (AIR) ช่วยลดแรงเสียดทานจากการพัฒนาไปสู่การผลิต ด้วย Ray และ AIR รหัส Python เดียวกันสามารถปรับขนาดจากแล็ปท็อปไปจนถึงคลัสเตอร์ขนาดใหญ่ได้อย่างราบรื่น
  • โครงสร้างพื้นฐานที่ได้รับการจัดการของ SageMaker และฟีเจอร์ต่างๆ เช่น งานประมวลผล งานฝึกอบรม และงานปรับแต่งไฮเปอร์พารามิเตอร์ สามารถใช้ไลบรารี Ray ที่อยู่ด้านล่างสำหรับการประมวลผลแบบกระจาย
  • การทดลอง Amazon SageMaker ช่วยให้สามารถทำซ้ำและติดตามการทดลองได้อย่างรวดเร็ว
  • Amazon SageMaker ฟีเจอร์สโตร์ มอบพื้นที่เก็บข้อมูลที่ปรับขนาดได้สำหรับจัดเก็บ เรียกดู และแชร์คุณสมบัติ ML สำหรับการฝึกโมเดล
  • โมเดลที่ผ่านการฝึกอบรมสามารถจัดเก็บ กำหนดเวอร์ชัน และติดตามได้ รีจิสทรีโมเดล Amazon SageMaker เพื่อการกำกับดูแลและการจัดการ
  • ท่อส่ง Amazon SageMaker ช่วยให้สามารถประสานวงจรชีวิต ML แบบ end-to-end ตั้งแต่การเตรียมข้อมูลและการฝึกอบรมไปจนถึงการปรับใช้แบบจำลองเป็นเวิร์กโฟลว์อัตโนมัติ

ภาพรวมโซลูชัน

โพสต์นี้เน้นที่ประโยชน์ของการใช้ Ray และ SageMaker ร่วมกัน เราตั้งค่าเวิร์กโฟลว์ ML แบบ end-to-end ที่ใช้ Ray ซึ่งจัดการโดยใช้ SageMaker Pipelines ขั้นตอนการทำงานประกอบด้วยการนำเข้าข้อมูลแบบขนานไปยังที่จัดเก็บฟีเจอร์โดยใช้นักแสดงของ Ray การประมวลผลข้อมูลล่วงหน้าด้วย Ray Data โมเดลการฝึกอบรมและการปรับแต่งไฮเปอร์พารามิเตอร์ตามขนาดโดยใช้ Ray Train และงานการปรับแต่งไฮเปอร์พารามิเตอร์ (HPO) และสุดท้ายคือการประเมินโมเดลและการลงทะเบียนโมเดลลงใน รีจิสทรีโมเดล

สำหรับข้อมูลของเราเราใช้ ชุดข้อมูลที่อยู่อาศัยสังเคราะห์ ซึ่งประกอบด้วยคุณสมบัติ XNUMX ประการ (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCHและ DECK) และแบบจำลองของเราจะทำนาย PRICE ของบ้าน

แต่ละขั้นตอนในเวิร์กโฟลว์ ML แบ่งออกเป็นขั้นตอนแยกกัน โดยมีสคริปต์ของตัวเองที่รับพารามิเตอร์อินพุตและเอาต์พุต ในส่วนถัดไป เราจะไฮไลต์ข้อมูลโค้ดคีย์จากแต่ละขั้นตอน สามารถดูรหัสเต็มได้ที่ พื้นที่เก็บข้อมูล GitHub aws-samples-for-ray.

เบื้องต้น

หากต้องการใช้ SageMaker Python SDK และเรียกใช้โค้ดที่เกี่ยวข้องกับโพสต์นี้ คุณต้องมีข้อกำหนดเบื้องต้นต่อไปนี้:

นำเข้าข้อมูลไปยัง SageMaker Feature Store

ขั้นตอนแรกในเวิร์กโฟลว์ ML คือการอ่านไฟล์ข้อมูลต้นฉบับ บริการจัดเก็บข้อมูลอย่างง่ายของ Amazon (Amazon S3) ในรูปแบบ CSV และนำเข้าไปยัง SageMaker Feature Store SageMaker Feature Store เป็นพื้นที่เก็บข้อมูลที่สร้างขึ้นโดยเฉพาะซึ่งช่วยให้ทีมสามารถสร้าง แชร์ และจัดการคุณสมบัติ ML ได้อย่างง่ายดาย ช่วยให้การค้นพบคุณลักษณะ การใช้ซ้ำ และการแชร์ง่ายขึ้น นำไปสู่การพัฒนาที่เร็วขึ้น เพิ่มการทำงานร่วมกันภายในทีมลูกค้า และลดต้นทุน

การนำเข้าคุณลักษณะเข้าไปในที่เก็บคุณลักษณะประกอบด้วยขั้นตอนต่อไปนี้:

  1. กำหนดกลุ่มคุณลักษณะและสร้างกลุ่มคุณลักษณะในที่เก็บคุณลักษณะ
  2. เตรียมแหล่งข้อมูลสำหรับที่เก็บคุณลักษณะโดยเพิ่มเวลาเหตุการณ์และ ID เร็กคอร์ดสำหรับข้อมูลแต่ละแถว
  3. นำเข้าข้อมูลที่เตรียมไว้ลงในกลุ่มคุณลักษณะโดยใช้ Boto3 SDK

ในส่วนนี้ เราเน้นเฉพาะขั้นตอนที่ 3 เนื่องจากเป็นส่วนที่เกี่ยวข้องกับการประมวลผลงานนำเข้าแบบขนานโดยใช้ Ray คุณสามารถตรวจสอบโค้ดแบบเต็มสำหรับกระบวนการนี้ได้ใน repo GitHub.

พื้นที่ นำเข้า_features วิธีการถูกกำหนดไว้ภายในคลาสที่เรียกว่า Featurestore. โปรดทราบว่าไฟล์ Featurestore ชั้นเรียนตกแต่งด้วย @ray.remote. สิ่งนี้บ่งชี้ว่าอินสแตนซ์ของคลาสนี้คือนักแสดง Ray ซึ่งเป็นหน่วยการคำนวณที่มีสถานะและเกิดขึ้นพร้อมกันภายใน Ray เป็นโมเดลการเขียนโปรแกรมที่ช่วยให้คุณสร้างอ็อบเจ็กต์แบบกระจายที่รักษาสถานะภายใน และสามารถเข้าถึงได้พร้อมกันโดยงานหลายอย่างที่ทำงานบนโหนดที่แตกต่างกันในคลัสเตอร์ Ray นักแสดงจัดเตรียมวิธีในการจัดการและสรุปสถานะที่ไม่แน่นอน ทำให้มีคุณค่าสำหรับการสร้างแอปพลิเคชัน stateful ที่ซับซ้อนในสภาพแวดล้อมแบบกระจาย คุณสามารถระบุความต้องการทรัพยากรในตัวนักแสดงได้เช่นกัน ในกรณีนี้ แต่ละกรณีของ FeatureStore คลาสจะต้องมี CPU 0.5 ตัว ดูรหัสต่อไปนี้:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

คุณสามารถโต้ตอบกับนักแสดงได้โดยการโทรไปที่ remote ตัวดำเนินการ ในโค้ดต่อไปนี้ จำนวนนักแสดงที่ต้องการจะถูกส่งผ่านเป็นอาร์กิวเมนต์อินพุตไปยังสคริปต์ จากนั้นข้อมูลจะถูกแบ่งพาร์ติชันตามจำนวนนักแสดง และส่งผ่านไปยังกระบวนการขนานระยะไกลที่จะนำเข้าไปยังที่เก็บคุณลักษณะ คุณสามารถโทร get บนวัตถุอ้างอิงเพื่อบล็อกการทำงานของงานปัจจุบันจนกว่าการคำนวณระยะไกลจะเสร็จสมบูรณ์และผลลัพธ์จะพร้อมใช้งาน เมื่อได้ผลแล้ว ray.get จะส่งกลับผลลัพธ์ และการดำเนินการของงานปัจจุบันจะดำเนินต่อไป

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

เตรียมข้อมูลสำหรับการฝึกอบรม การตรวจสอบ และการทดสอบ

ในขั้นตอนนี้ เราใช้ Ray Dataset เพื่อแยก แปลง และปรับขนาดชุดข้อมูลของเราอย่างมีประสิทธิภาพเพื่อเตรียมพร้อมสำหรับการเรียนรู้ของเครื่อง Ray Dataset มอบวิธีมาตรฐานในการโหลดข้อมูลแบบกระจายไปยัง Ray ซึ่งรองรับระบบจัดเก็บข้อมูลและรูปแบบไฟล์ต่างๆ มี API สำหรับการดำเนินการประมวลผลข้อมูล ML ทั่วไปล่วงหน้า เช่น การแปลงแบบขนาน การสับเปลี่ยน การจัดกลุ่ม และการรวมกลุ่ม Ray Dataset ยังจัดการการทำงานที่ต้องมีการตั้งค่าแบบมีสถานะและการเร่งความเร็ว GPU โดยผสานรวมได้อย่างราบรื่นกับไลบรารีการประมวลผลข้อมูลอื่นๆ เช่น Spark, Pandas, NumPy และอื่นๆ อีกมากมาย รวมถึงเฟรมเวิร์ก ML เช่น TensorFlow และ PyTorch ซึ่งช่วยให้สามารถสร้างไปป์ไลน์ข้อมูลแบบ end-to-end และเวิร์กโฟลว์ ML ที่ด้านบนของ Ray เป้าหมายคือทำให้การประมวลผลข้อมูลแบบกระจายและ ML ง่ายขึ้นสำหรับผู้ปฏิบัติงานและนักวิจัย

มาดูส่วนของสคริปต์ที่ทำการประมวลผลข้อมูลล่วงหน้านี้กัน เราเริ่มต้นด้วยการโหลดข้อมูลจากที่เก็บคุณลักษณะ:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

จากนั้นเราจะแบ่งและปรับขนาดข้อมูลโดยใช้นามธรรมระดับสูงกว่าที่มีอยู่จาก ray.data ห้องสมุด:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

ชุดข้อมูลการรถไฟที่ประมวลผล การตรวจสอบ และการทดสอบจะถูกจัดเก็บไว้ใน Amazon S3 และจะถูกส่งผ่านเป็นพารามิเตอร์อินพุตไปยังขั้นตอนถัดไป

ดำเนินการฝึกอบรมโมเดลและเพิ่มประสิทธิภาพไฮเปอร์พารามิเตอร์

เมื่อข้อมูลของเราได้รับการประมวลผลล่วงหน้าและพร้อมสำหรับการสร้างแบบจำลอง ก็ถึงเวลาฝึกโมเดล ML บางรุ่นและปรับแต่งไฮเปอร์พารามิเตอร์เพื่อเพิ่มประสิทธิภาพในการคาดการณ์ให้สูงสุด เราใช้ XGBoost-เรย์ซึ่งเป็นแบ็กเอนด์แบบกระจายสำหรับ XGBoost ที่สร้างขึ้นบน Ray ที่ช่วยให้สามารถฝึกฝนโมเดล XGBoost บนชุดข้อมูลขนาดใหญ่โดยใช้หลายโหนดและ GPU โดยให้การแทนที่แบบดรอปอินอย่างง่ายดายสำหรับรถไฟของ XGBoost และการคาดการณ์ API ในขณะเดียวกันก็จัดการกับความซับซ้อนของการจัดการข้อมูลแบบกระจายและการฝึกอบรมภายใต้ประทุน

เพื่อเปิดใช้งานการกระจายการฝึกบนหลายโหนด เราใช้คลาสตัวช่วยที่มีชื่อว่า เรย์เฮลเปอร์. ดังที่แสดงในโค้ดต่อไปนี้ เราใช้การกำหนดค่าทรัพยากรของงานการฝึกอบรมและเลือกโฮสต์แรกเป็นโหนดหลัก:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

เราสามารถใช้ข้อมูลโฮสต์เพื่อตัดสินใจว่าจะเริ่มต้น Ray อย่างไรในแต่ละอินสแตนซ์งานการฝึกอบรม:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

เมื่องานการฝึกอบรมเริ่มต้นขึ้น คลัสเตอร์ Ray จะสามารถเริ่มต้นได้โดยการเรียก start_ray() วิธีการในกรณีของ RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

จากนั้นเราใช้เทรนเนอร์ XGBoost จาก XGBoost-Ray ในการฝึก:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

โปรดทราบว่าในขณะที่สร้างอินสแตนซ์ของ trainerเราผ่าน RayParamsซึ่งใช้จำนวนนักแสดงและจำนวน CPU ต่อนักแสดง XGBoost-Ray ใช้ข้อมูลนี้เพื่อกระจายการฝึกไปยังโหนดทั้งหมดที่เชื่อมต่อกับคลัสเตอร์ Ray

ตอนนี้เราสร้างออบเจ็กต์ตัวประมาณค่า XGBoost ตาม SageMaker Python SDK และใช้สิ่งนั้นสำหรับงาน HPO

จัดเตรียมขั้นตอนก่อนหน้านี้โดยใช้ SageMaker Pipelines

ในการสร้างเวิร์กโฟลว์ ML ที่ปรับขนาดได้ตั้งแต่ต้นทางถึงปลายทางและนำกลับมาใช้ใหม่ได้ เราจำเป็นต้องใช้เครื่องมือ CI/CD เพื่อประสานขั้นตอนก่อนหน้านี้ลงในไปป์ไลน์ SageMaker Pipelines มีการผสานรวมโดยตรงกับ SageMaker, SageMaker Python SDK และ SageMaker Studio การผสานรวมนี้ช่วยให้คุณสร้างเวิร์กโฟลว์ ML ด้วย Python SDK ที่ใช้งานง่าย จากนั้นแสดงภาพและจัดการเวิร์กโฟลว์ของคุณโดยใช้ SageMaker Studio คุณยังสามารถติดตามประวัติข้อมูลของคุณภายในการดำเนินการไปป์ไลน์และกำหนดขั้นตอนสำหรับการแคชได้

SageMaker Pipelines สร้าง Directed Acyclic Graph (DAG) ที่มีขั้นตอนที่จำเป็นในการสร้างเวิร์กโฟลว์ ML แต่ละไปป์ไลน์คือชุดของขั้นตอนที่เชื่อมต่อถึงกันซึ่งควบคุมโดยการขึ้นต่อกันของข้อมูลระหว่างขั้นตอนต่างๆ และสามารถกำหนดพารามิเตอร์ได้ ช่วยให้คุณสามารถระบุตัวแปรอินพุตเป็นพารามิเตอร์สำหรับการทำงานของไปป์ไลน์แต่ละครั้ง SageMaker Pipelines มีพารามิเตอร์ไปป์ไลน์สี่ประเภท: ParameterString, ParameterInteger, ParameterFloatและ ParameterBoolean. ในส่วนนี้ เราจะกำหนดพารามิเตอร์ตัวแปรอินพุตบางส่วนและตั้งค่าการกำหนดค่าการแคชขั้นตอน:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

เรากำหนดขั้นตอนการประมวลผลสองขั้นตอน: ขั้นตอนหนึ่งสำหรับการนำเข้า SageMaker Feature Store และอีกขั้นตอนสำหรับการเตรียมข้อมูล สิ่งนี้ควรมีลักษณะคล้ายกับขั้นตอนก่อนหน้านี้ที่อธิบายไว้ก่อนหน้านี้มาก รหัสใหม่เพียงบรรทัดเดียวคือ ProcessingStep หลังจากคำจำกัดความของขั้นตอน ซึ่งช่วยให้เราสามารถกำหนดค่างานการประมวลผลและรวมเป็นขั้นตอนไปป์ไลน์ได้ นอกจากนี้เรายังระบุการขึ้นต่อกันของขั้นตอนการเตรียมข้อมูลในขั้นตอนการนำเข้า SageMaker Feature Store ดูรหัสต่อไปนี้:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

ในทำนองเดียวกัน ในการสร้างโมเดลการฝึกอบรมและขั้นตอนการปรับแต่ง เราจำเป็นต้องเพิ่มคำจำกัดความของ TuningStep หลังจากโค้ดของขั้นตอนการฝึกโมเดลเพื่อให้เราสามารถรันการปรับแต่งไฮเปอร์พารามิเตอร์ของ SageMaker เป็นขั้นตอนหนึ่งในไปป์ไลน์:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

หลังจากขั้นตอนการปรับแต่ง เราเลือกที่จะลงทะเบียนโมเดลที่ดีที่สุดใน SageMaker Model Registry เพื่อควบคุมคุณภาพของโมเดล เราใช้ประตูคุณภาพขั้นต่ำที่เปรียบเทียบเมตริกวัตถุประสงค์ (RMSE) ของโมเดลที่ดีที่สุดกับเกณฑ์ที่กำหนดเป็นพารามิเตอร์อินพุตของไปป์ไลน์ rmse_threshold. เพื่อทำการประเมินนี้ เราสร้างขั้นตอนการประมวลผลอื่นเพื่อเรียกใช้ สคริปต์การประเมิน. ผลการประเมินแบบจำลองจะถูกจัดเก็บเป็นไฟล์คุณสมบัติ ไฟล์คุณสมบัติมีประโยชน์อย่างยิ่งเมื่อวิเคราะห์ผลลัพธ์ของขั้นตอนการประมวลผล เพื่อตัดสินใจว่าควรรันขั้นตอนอื่นๆ อย่างไร ดูรหัสต่อไปนี้:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

เรากำหนด a ModelStep เพื่อลงทะเบียนโมเดลที่ดีที่สุดใน SageMaker Model Registry ในไปป์ไลน์ของเรา ในกรณีที่รุ่นที่ดีที่สุดไม่ผ่านการตรวจสอบคุณภาพที่กำหนดไว้ เราจะระบุ a เพิ่มเติม FailStep เพื่อส่งออกข้อความแสดงข้อผิดพลาด:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

ต่อไปเราใช้ a ConditionStep เพื่อประเมินว่าขั้นตอนการลงทะเบียนโมเดลหรือขั้นตอนความล้มเหลวควรดำเนินการต่อไปในไปป์ไลน์ ในกรณีของเรา โมเดลที่ดีที่สุดจะถูกลงทะเบียนหากคะแนน RMSE ต่ำกว่าเกณฑ์

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

สุดท้ายนี้ เราจะเรียบเรียงขั้นตอนที่กำหนดไว้ทั้งหมดลงในไปป์ไลน์:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

ไปป์ไลน์ก่อนหน้าสามารถแสดงภาพและดำเนินการได้โดยตรงใน SageMaker Studio หรือดำเนินการโดยการเรียก execution = training_pipeline.start(). รูปภาพต่อไปนี้แสดงการไหลของไปป์ไลน์

ไปป์ไลน์ SageMaker DAG

นอกจากนี้ เรายังสามารถตรวจสอบลำดับวงศ์ตระกูลของสิ่งประดิษฐ์ที่สร้างขึ้นโดยการดำเนินการไปป์ไลน์ได้

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

ปรับใช้โมเดล

หลังจากที่โมเดลที่ดีที่สุดได้รับการลงทะเบียนใน SageMaker Model Registry ผ่านการรันไปป์ไลน์ เราจะปรับใช้โมเดลกับตำแหน่งข้อมูลแบบเรียลไทม์โดยใช้ความสามารถในการปรับใช้โมเดลที่มีการจัดการเต็มรูปแบบของ SageMaker SageMaker มีตัวเลือกการปรับใช้โมเดลอื่นๆ เพื่อตอบสนองความต้องการของกรณีการใช้งานที่แตกต่างกัน สำหรับรายละเอียด โปรดดูที่ ปรับใช้แบบจำลองเพื่อการอนุมาน เมื่อเลือกตัวเลือกที่เหมาะสมสำหรับกรณีการใช้งานของคุณ ก่อนอื่น เรามาลงทะเบียนโมเดลใน SageMaker Model Registry กันก่อน:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

สถานะปัจจุบันของโมเดลคือ PendingApproval. เราจำเป็นต้องกำหนดสถานะเป็น Approved ก่อนการใช้งาน:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

ทำความสะอาด

หลังจากคุณทดลองเสร็จแล้ว อย่าลืมทำความสะอาดทรัพยากรเพื่อหลีกเลี่ยงค่าใช้จ่ายที่ไม่จำเป็น หากต้องการล้างข้อมูล ให้ลบตำแหน่งข้อมูลเรียลไทม์ กลุ่มโมเดล ไปป์ไลน์ และกลุ่มฟีเจอร์โดยการเรียก API ลบจุดสิ้นสุด, ลบModelPackageGroup, ลบไปป์ไลน์และ ลบFeatureGroupตามลำดับ และปิดอินสแตนซ์สมุดบันทึก SageMaker Studio ทั้งหมด

สรุป

โพสต์นี้สาธิตคำแนะนำแบบทีละขั้นตอนเกี่ยวกับวิธีใช้ SageMaker Pipelines เพื่อประสานเวิร์กโฟลว์ ML ที่ใช้ Ray นอกจากนี้เรายังสาธิตความสามารถของ SageMaker Pipelines ในการผสานรวมกับเครื่องมือ ML ของบริษัทอื่นอีกด้วย มีบริการต่างๆ ของ AWS ที่รองรับปริมาณงานของ Ray ในรูปแบบที่ปรับขนาดได้และปลอดภัย เพื่อให้มั่นใจถึงประสิทธิภาพที่เป็นเลิศและประสิทธิภาพในการดำเนินงาน ตอนนี้ถึงตาคุณแล้วที่จะสำรวจความสามารถอันทรงพลังเหล่านี้ และเริ่มเพิ่มประสิทธิภาพเวิร์กโฟลว์ Machine Learning ของคุณด้วย Amazon SageMaker Pipelines และ Ray ดำเนินการวันนี้และปลดล็อกศักยภาพสูงสุดของโปรเจ็กต์ ML ของคุณ!


เกี่ยวกับผู้เขียน

จัดระเบียบเวิร์กโฟลว์แมชชีนเลิร์นนิงที่ใช้ Ray โดยใช้ Amazon SageMaker | Amazon Web Services PlatoBlockchain ข้อมูลอัจฉริยะ ค้นหาแนวตั้ง AI.ราจู รังกัน เป็นสถาปนิกโซลูชันอาวุโสที่ Amazon Web Services (AWS) เขาทำงานร่วมกับหน่วยงานที่ได้รับการสนับสนุนจากรัฐบาล โดยช่วยสร้างโซลูชัน AI/ML โดยใช้ AWS เมื่อไม่ได้ปรับแต่งโซลูชันบนคลาวด์ คุณจะพบว่าเขาไปเที่ยวกับครอบครัวหรือทุบเบอร์ดี้ในเกมแบดมินตันที่มีชีวิตชีวากับเพื่อน ๆ

จัดระเบียบเวิร์กโฟลว์แมชชีนเลิร์นนิงที่ใช้ Ray โดยใช้ Amazon SageMaker | Amazon Web Services PlatoBlockchain ข้อมูลอัจฉริยะ ค้นหาแนวตั้ง AI.เชอรี่ ดิง เป็นสถาปนิกโซลูชันผู้เชี่ยวชาญ AI/ML อาวุโสที่ Amazon Web Services (AWS) เธอมีประสบการณ์มากมายในด้านการเรียนรู้ของเครื่องโดยสำเร็จการศึกษาระดับปริญญาเอกสาขาวิทยาการคอมพิวเตอร์ เธอทำงานร่วมกับลูกค้าภาครัฐเป็นหลักเกี่ยวกับความท้าทายทางธุรกิจต่างๆ ที่เกี่ยวข้องกับ AI/ML ช่วยให้พวกเขาเร่งการเดินทางการเรียนรู้ของเครื่องบน AWS Cloud เมื่อไม่ช่วยเหลือลูกค้าเธอก็ชอบกิจกรรมกลางแจ้ง

ประทับเวลา:

เพิ่มเติมจาก AWS Machine Learning AWS