Ενορχηστρώστε ροές εργασιών μηχανικής εκμάθησης που βασίζονται σε Ray χρησιμοποιώντας το Amazon SageMaker | Υπηρεσίες Ιστού της Amazon

Ενορχηστρώστε ροές εργασιών μηχανικής εκμάθησης που βασίζονται σε Ray χρησιμοποιώντας το Amazon SageMaker | Υπηρεσίες Ιστού της Amazon

Η μηχανική μάθηση (ML) γίνεται όλο και πιο περίπλοκη καθώς οι πελάτες προσπαθούν να λύσουν όλο και πιο προκλητικά προβλήματα. Αυτή η πολυπλοκότητα οδηγεί συχνά στην ανάγκη για κατανεμημένη ML, όπου πολλαπλές μηχανές χρησιμοποιούνται για την εκπαίδευση ενός μόνο μοντέλου. Αν και αυτό επιτρέπει την παραλληλοποίηση των εργασιών σε πολλούς κόμβους, οδηγώντας σε επιταχυνόμενους χρόνους εκπαίδευσης, βελτιωμένη επεκτασιμότητα και βελτιωμένη απόδοση, υπάρχουν σημαντικές προκλήσεις στην αποτελεσματική χρήση κατανεμημένου υλικού. Οι επιστήμονες δεδομένων πρέπει να αντιμετωπίσουν προκλήσεις όπως η κατάτμηση δεδομένων, η εξισορρόπηση φορτίου, η ανοχή σφαλμάτων και η επεκτασιμότητα. Οι μηχανικοί ML πρέπει να χειρίζονται την παραλληλοποίηση, τον προγραμματισμό, τα σφάλματα και τις επαναλήψεις με μη αυτόματο τρόπο, απαιτώντας πολύπλοκο κώδικα υποδομής.

Σε αυτήν την ανάρτηση, συζητάμε τα οφέλη της χρήσης ακτίνα και Amazon Sage Maker για κατανεμημένη ML και παρέχετε έναν οδηγό βήμα προς βήμα για τον τρόπο χρήσης αυτών των πλαισίων για τη δημιουργία και την ανάπτυξη μιας επεκτάσιμης ροής εργασίας ML.

Το Ray, ένα κατανεμημένο υπολογιστικό πλαίσιο ανοιχτού κώδικα, παρέχει ένα ευέλικτο πλαίσιο για κατανεμημένη εκπαίδευση και εξυπηρέτηση μοντέλων ML. Αφαιρεί τις λεπτομέρειες κατανεμημένου συστήματος χαμηλού επιπέδου μέσω απλών, επεκτάσιμων βιβλιοθηκών για κοινές εργασίες ML, όπως η προεπεξεργασία δεδομένων, η κατανεμημένη εκπαίδευση, ο συντονισμός υπερπαραμέτρων, η εκμάθηση ενίσχυσης και η εξυπηρέτηση μοντέλων.

Το SageMaker είναι μια πλήρως διαχειριζόμενη υπηρεσία για την κατασκευή, την εκπαίδευση και την ανάπτυξη μοντέλων ML. Το Ray ενσωματώνεται απρόσκοπτα με τις δυνατότητες του SageMaker για τη δημιουργία και την ανάπτυξη πολύπλοκων φόρτων εργασίας ML που είναι ταυτόχρονα αποτελεσματικοί και αξιόπιστοι. Ο συνδυασμός Ray και SageMaker παρέχει δυνατότητες από άκρο σε άκρο για κλιμακούμενες ροές εργασιών ML και έχει τα ακόλουθα τονισμένα χαρακτηριστικά:

  • Οι κατανεμημένοι ηθοποιοί και οι κατασκευές παραλληλισμού στο Ray απλοποιούν την ανάπτυξη κατανεμημένων εφαρμογών.
  • Το Ray AI Runtime (AIR) μειώνει την τριβή κατά τη μετάβαση από την ανάπτυξη στην παραγωγή. Με το Ray και το AIR, ο ίδιος κώδικας Python μπορεί να κλιμακωθεί απρόσκοπτα από ένα φορητό υπολογιστή σε ένα μεγάλο σύμπλεγμα.
  • Η διαχειριζόμενη υποδομή του SageMaker και λειτουργίες όπως εργασίες επεξεργασίας, εργασίες εκπαίδευσης και εργασίες συντονισμού υπερπαραμέτρων μπορούν να χρησιμοποιούν βιβλιοθήκες Ray από κάτω για κατανεμημένους υπολογιστές.
  • Πειράματα Amazon SageMaker επιτρέπει την ταχεία επανάληψη και παρακολούθηση των δοκιμών.
  • Κατάστημα χαρακτηριστικών Amazon SageMaker παρέχει ένα επεκτάσιμο αποθετήριο για αποθήκευση, ανάκτηση και κοινή χρήση χαρακτηριστικών ML για εκπαίδευση μοντέλων.
  • Τα εκπαιδευμένα μοντέλα μπορούν να αποθηκευτούν, να εκδοθούν και να παρακολουθηθούν Μητρώο μοντέλων Amazon SageMaker για διακυβέρνηση και διαχείριση.
  • Αγωγοί Amazon SageMaker επιτρέπει την ενορχήστρωση του κύκλου ζωής ML από άκρο σε άκρο από την προετοιμασία και την εκπαίδευση δεδομένων έως την ανάπτυξη μοντέλων ως αυτοματοποιημένες ροές εργασίας.

Επισκόπηση λύσεων

Αυτή η ανάρτηση εστιάζει στα οφέλη από τη χρήση του Ray και του SageMaker μαζί. Ρυθμίσαμε μια ροή εργασιών ML βασισμένη σε ακτίνες από άκρο σε άκρο, ενορχηστρωμένη με χρήση των αγωγών SageMaker. Η ροή εργασίας περιλαμβάνει παράλληλη απορρόφηση δεδομένων στο χώρο αποθήκευσης χαρακτηριστικών χρησιμοποιώντας Ray actors, προεπεξεργασία δεδομένων με Ray Data, μοντέλα εκπαίδευσης και συντονισμό υπερπαραμέτρων σε κλίμακα χρησιμοποιώντας εργασίες συντονισμού Ray Train και βελτιστοποίησης υπερπαραμέτρων (HPO), και τέλος αξιολόγηση μοντέλου και καταχώρηση του μοντέλου σε μητρώου μοντέλου.

Για τα δεδομένα μας χρησιμοποιούμε ένα συνθετικό σύνολο δεδομένων στέγασης που αποτελείται από οκτώ χαρακτηριστικά (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH, να DECK) και το μοντέλο μας θα προβλέψει το PRICE του σπιτιού.

Κάθε στάδιο στη ροή εργασίας ML χωρίζεται σε διακριτά βήματα, με το δικό του σενάριο που λαμβάνει παραμέτρους εισόδου και εξόδου. Στην επόμενη ενότητα, επισημαίνουμε αποσπάσματα κώδικα κλειδιού από κάθε βήμα. Ο πλήρης κωδικός βρίσκεται στο αποθετήριο aws-samples-for-ray GitHub.

Προϋποθέσεις

Για να χρησιμοποιήσετε το SageMaker Python SDK και να εκτελέσετε τον κώδικα που σχετίζεται με αυτήν την ανάρτηση, χρειάζεστε τις ακόλουθες προϋποθέσεις:

Απορροφήστε δεδομένα στο SageMaker Feature Store

Το πρώτο βήμα στη ροή εργασίας ML είναι να διαβάσετε το αρχείο δεδομένων προέλευσης από Απλή υπηρεσία αποθήκευσης Amazon (Amazon S3) σε μορφή CSV και μεταφέρετέ το στο SageMaker Feature Store. Το SageMaker Feature Store είναι ένα ειδικά κατασκευασμένο αποθετήριο που διευκολύνει τις ομάδες να δημιουργούν, να μοιράζονται και να διαχειρίζονται χαρακτηριστικά ML. Απλοποιεί την ανακάλυψη, την επαναχρησιμοποίηση και την κοινή χρήση χαρακτηριστικών, οδηγώντας σε ταχύτερη ανάπτυξη, αυξημένη συνεργασία μεταξύ των ομάδων πελατών και μειωμένο κόστος.

Η απορρόφηση λειτουργιών στο χώρο αποθήκευσης χαρακτηριστικών περιλαμβάνει τα ακόλουθα βήματα:

  1. Καθορίστε μια ομάδα δυνατοτήτων και δημιουργήστε την ομάδα δυνατοτήτων στο χώρο αποθήκευσης δυνατοτήτων.
  2. Προετοιμάστε τα δεδομένα προέλευσης για το χώρο αποθήκευσης δυνατοτήτων προσθέτοντας μια ώρα συμβάντος και ένα αναγνωριστικό εγγραφής για κάθε σειρά δεδομένων.
  3. Εισαγάγετε τα έτοιμα δεδομένα στην ομάδα δυνατοτήτων χρησιμοποιώντας το Boto3 SDK.

Σε αυτήν την ενότητα, επισημαίνουμε μόνο το Βήμα 3, επειδή αυτό είναι το μέρος που περιλαμβάνει παράλληλη επεξεργασία της εργασίας απορρόφησης χρησιμοποιώντας το Ray. Μπορείτε να δείτε τον πλήρη κώδικα για αυτήν τη διαδικασία στο GitHub repo.

Η ingest_features Η μέθοδος ορίζεται μέσα σε μια κλάση που ονομάζεται Featurestore. Σημειώστε ότι το Featurestore η τάξη είναι διακοσμημένη με @ray.remote. Αυτό υποδηλώνει ότι ένα στιγμιότυπο αυτής της κλάσης είναι ένας παράγοντας Ray, μια κατάσταση και ταυτόχρονη υπολογιστική μονάδα εντός του Ray. Είναι ένα μοντέλο προγραμματισμού που σας επιτρέπει να δημιουργείτε κατανεμημένα αντικείμενα που διατηρούν μια εσωτερική κατάσταση και μπορούν να προσπελαστούν ταυτόχρονα από πολλαπλές εργασίες που εκτελούνται σε διαφορετικούς κόμβους σε ένα σύμπλεγμα Ray. Οι ηθοποιοί παρέχουν έναν τρόπο διαχείρισης και ενθυλάκωσης της μεταβλητής κατάστασης, καθιστώντας τους πολύτιμους για τη δημιουργία πολύπλοκων, κρατικών εφαρμογών σε ένα κατανεμημένο περιβάλλον. Μπορείτε επίσης να καθορίσετε απαιτήσεις πόρων σε ηθοποιούς. Σε αυτή την περίπτωση, κάθε περίπτωση του FeatureStore η τάξη θα απαιτήσει 0.5 CPU. Δείτε τον παρακάτω κώδικα:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

Μπορείτε να αλληλεπιδράσετε με τον ηθοποιό καλώντας το remote χειριστής. Στον παρακάτω κώδικα, ο επιθυμητός αριθμός ηθοποιών μεταβιβάζεται ως όρισμα εισόδου στο σενάριο. Στη συνέχεια, τα δεδομένα διαμερίζονται με βάση τον αριθμό των ηθοποιών και μεταβιβάζονται στις απομακρυσμένες παράλληλες διεργασίες για να εισαχθούν στο χώρο αποθήκευσης χαρακτηριστικών. Μπορείς να καλέσεις get στο αντικείμενο ref για να αποκλείσετε την εκτέλεση της τρέχουσας εργασίας μέχρι να ολοκληρωθεί ο απομακρυσμένος υπολογισμός και να είναι διαθέσιμο το αποτέλεσμα. Όταν το αποτέλεσμα είναι διαθέσιμο, ray.get θα επιστρέψει το αποτέλεσμα και η εκτέλεση της τρέχουσας εργασίας θα συνεχιστεί.

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

Προετοιμάστε δεδομένα για εκπαίδευση, επικύρωση και δοκιμή

Σε αυτό το βήμα, χρησιμοποιούμε Ray Dataset για να διαχωρίσουμε, να μετασχηματίσουμε και να κλιμακώσουμε αποτελεσματικά το σύνολο δεδομένων μας στο πλαίσιο της προετοιμασίας για τη μηχανική εκμάθηση. Το Ray Dataset παρέχει έναν τυπικό τρόπο φόρτωσης κατανεμημένων δεδομένων στο Ray, υποστηρίζοντας διάφορα συστήματα αποθήκευσης και μορφές αρχείων. Διαθέτει API για κοινές λειτουργίες προεπεξεργασίας δεδομένων ML όπως παράλληλοι μετασχηματισμοί, ανακάτεμα, ομαδοποίηση και συναθροίσεις. Το Ray Dataset χειρίζεται επίσης λειτουργίες που απαιτούν ρύθμιση κατάστασης και επιτάχυνση GPU. Ενσωματώνεται ομαλά με άλλες βιβλιοθήκες επεξεργασίας δεδομένων όπως το Spark, το Pandas, το NumPy και άλλα, καθώς και με πλαίσια ML όπως το TensorFlow και το PyTorch. Αυτό επιτρέπει τη δημιουργία αγωγών δεδομένων από άκρο σε άκρο και ροές εργασίας ML πάνω από το Ray. Ο στόχος είναι να διευκολυνθεί η κατανεμημένη επεξεργασία δεδομένων και η ML για τους επαγγελματίες και τους ερευνητές.

Ας δούμε ενότητες των σεναρίων που εκτελούν αυτήν την προεπεξεργασία δεδομένων. Ξεκινάμε φορτώνοντας τα δεδομένα από το κατάστημα χαρακτηριστικών:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

Στη συνέχεια διαχωρίζουμε και κλιμακώνουμε τα δεδομένα χρησιμοποιώντας τις αφαιρέσεις υψηλότερου επιπέδου που είναι διαθέσιμες από το ray.data βιβλιοθήκη:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

Τα επεξεργασμένα σύνολα δεδομένων αμαξοστοιχίας, επικύρωσης και δοκιμής αποθηκεύονται στο Amazon S3 και θα περάσουν ως παράμετροι εισόδου στα επόμενα βήματα.

Εκτελέστε εκπαίδευση μοντέλων και βελτιστοποίηση υπερπαραμέτρων

Με τα δεδομένα μας προεπεξεργασμένα και έτοιμα για μοντελοποίηση, ήρθε η ώρα να εκπαιδεύσουμε ορισμένα μοντέλα ML και να ρυθμίσουμε τις υπερπαραμέτρους τους για να μεγιστοποιήσουμε την προγνωστική απόδοση. Χρησιμοποιούμε XGBoost-Ray, ένα κατανεμημένο backend για το XGBoost που βασίζεται σε Ray που επιτρέπει την εκπαίδευση μοντέλων XGBoost σε μεγάλα σύνολα δεδομένων χρησιμοποιώντας πολλούς κόμβους και GPU. Παρέχει απλές αντικαταστάσεις για το τρένο XGBoost και προβλέψεις API, ενώ χειρίζεται την πολυπλοκότητα της διαχείρισης κατανεμημένων δεδομένων και της εκπαίδευσης κάτω από την κουκούλα.

Για να ενεργοποιήσουμε τη διανομή της εκπαίδευσης σε πολλούς κόμβους, χρησιμοποιούμε μια βοηθητική κλάση που ονομάζεται RayHelper. Όπως φαίνεται στον παρακάτω κώδικα, χρησιμοποιούμε τη διαμόρφωση πόρων της εργασίας εκπαίδευσης και επιλέγουμε τον πρώτο κεντρικό υπολογιστή ως κύριο κόμβο:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

Μπορούμε να χρησιμοποιήσουμε τις πληροφορίες κεντρικού υπολογιστή για να αποφασίσουμε πώς να αρχικοποιήσουμε το Ray σε κάθε μία από τις περιπτώσεις εργασίας εκπαίδευσης:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

Όταν ξεκινά μια εργασία εκπαίδευσης, ένα σύμπλεγμα Ray μπορεί να αρχικοποιηθεί καλώντας το start_ray() μέθοδος σε μια περίπτωση του RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

Στη συνέχεια χρησιμοποιούμε τον εκπαιδευτή XGBoost από την XGBoost-Ray για εκπαίδευση:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

Σημειώστε ότι κατά τη στιγμιοποίηση του trainer, περνάμε RayParams, το οποίο λαμβάνει τον αριθμό των παραγόντων και τον αριθμό των CPU ανά ηθοποιούς. Το XGBoost-Ray χρησιμοποιεί αυτές τις πληροφορίες για να διανείμει την εκπαίδευση σε όλους τους κόμβους που είναι συνδεδεμένοι στο σύμπλεγμα Ray.

Τώρα δημιουργούμε ένα αντικείμενο εκτιμητή XGBoost με βάση το SageMaker Python SDK και το χρησιμοποιούμε για την εργασία HPO.

Ενορχηστρώστε τα προηγούμενα βήματα χρησιμοποιώντας το SageMaker Pipelines

Για να δημιουργήσουμε μια επεκτάσιμη και επαναχρησιμοποιήσιμη ροή εργασίας ML από άκρο σε άκρο, πρέπει να χρησιμοποιήσουμε ένα εργαλείο CI/CD για να ενορχηστρώσουμε τα προηγούμενα βήματα σε μια διοχέτευση. Το SageMaker Pipelines έχει άμεση ενοποίηση με το SageMaker, το SageMaker Python SDK και το SageMaker Studio. Αυτή η ενοποίηση σάς επιτρέπει να δημιουργείτε ροές εργασίας ML με ένα εύχρηστο Python SDK και, στη συνέχεια, να οπτικοποιείτε και να διαχειρίζεστε τη ροή εργασίας σας χρησιμοποιώντας το SageMaker Studio. Μπορείτε επίσης να παρακολουθείτε το ιστορικό των δεδομένων σας εντός της εκτέλεσης του αγωγού και να ορίσετε βήματα για την προσωρινή αποθήκευση.

Το SageMaker Pipelines δημιουργεί ένα Directed Acyclic Graph (DAG) που περιλαμβάνει βήματα που απαιτούνται για τη δημιουργία μιας ροής εργασίας ML. Κάθε σωλήνωση είναι μια σειρά διασυνδεδεμένων βημάτων που ενορχηστρώνονται από εξαρτήσεις δεδομένων μεταξύ των βημάτων και μπορεί να παραμετροποιηθεί, επιτρέποντάς σας να παρέχετε μεταβλητές εισόδου ως παραμέτρους για κάθε εκτέλεση του αγωγού. Το SageMaker Pipelines έχει τέσσερις τύπους παραμέτρων αγωγών: ParameterString, ParameterInteger, ParameterFloat, να ParameterBoolean. Σε αυτήν την ενότητα, παραμετροποιούμε ορισμένες από τις μεταβλητές εισόδου και ρυθμίζουμε τη διαμόρφωση βηματικής προσωρινής αποθήκευσης:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

Ορίζουμε δύο βήματα επεξεργασίας: το ένα για την απορρόφηση του SageMaker Feature Store και το άλλο για την προετοιμασία δεδομένων. Αυτό θα πρέπει να μοιάζει πολύ με τα προηγούμενα βήματα που περιγράφηκαν προηγουμένως. Η μόνη νέα γραμμή κώδικα είναι η ProcessingStep μετά τον ορισμό των βημάτων, που μας επιτρέπει να λάβουμε τη διαμόρφωση της εργασίας επεξεργασίας και να τη συμπεριλάβουμε ως βήμα διοχέτευσης. Καθορίζουμε περαιτέρω την εξάρτηση του βήματος προετοιμασίας δεδομένων από το βήμα απορρόφησης του SageMaker Feature Store. Δείτε τον παρακάτω κώδικα:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

Παρομοίως, για να δημιουργήσουμε ένα πρότυπο βήμα εκπαίδευσης και συντονισμού, πρέπει να προσθέσουμε έναν ορισμό του TuningStep μετά τον κώδικα του βήματος εκπαίδευσης του μοντέλου για να μας επιτρέψει να εκτελέσουμε τον συντονισμό υπερπαραμέτρων του SageMaker ως ένα βήμα στη διαδικασία:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

Μετά το βήμα συντονισμού, επιλέγουμε να καταχωρήσουμε το καλύτερο μοντέλο στο SageMaker Model Registry. Για να ελέγξουμε την ποιότητα του μοντέλου, εφαρμόζουμε μια πύλη ελάχιστης ποιότητας που συγκρίνει την αντικειμενική μέτρηση του καλύτερου μοντέλου (RMSE) με ένα όριο που ορίζεται ως η παράμετρος εισόδου του αγωγού rmse_threshold. Για να κάνουμε αυτήν την αξιολόγηση, δημιουργούμε ένα άλλο βήμα επεξεργασίας για να εκτελέσουμε ένα σενάριο αξιολόγησης. Το αποτέλεσμα αξιολόγησης μοντέλου θα αποθηκευτεί ως αρχείο ιδιοκτησίας. Τα αρχεία ιδιοτήτων είναι ιδιαίτερα χρήσιμα κατά την ανάλυση των αποτελεσμάτων ενός βήματος επεξεργασίας για να αποφασίσετε πώς θα εκτελεστούν άλλα βήματα. Δείτε τον παρακάτω κώδικα:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

Ορίζουμε α ModelStep για να καταχωρήσετε το καλύτερο μοντέλο στο Μητρώο Μοντέλων του SageMaker στη διάθεσή μας. Σε περίπτωση που το καλύτερο μοντέλο δεν περάσει τον προκαθορισμένο ποιοτικό μας έλεγχο, καθορίζουμε επιπλέον α FailStep για την έξοδο ενός μηνύματος σφάλματος:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

Στη συνέχεια, χρησιμοποιούμε α ConditionStep για να αξιολογήσει εάν το βήμα καταχώρισης του μοντέλου ή το βήμα αποτυχίας θα πρέπει να ακολουθηθεί στη συνέχεια. Στην περίπτωσή μας, το καλύτερο μοντέλο θα καταχωρηθεί εάν η βαθμολογία RMSE του είναι χαμηλότερη από το όριο.

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

Τέλος, ενορχηστρώνουμε όλα τα καθορισμένα βήματα σε μια διοχέτευση:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

Η προηγούμενη διοχέτευση μπορεί να οπτικοποιηθεί και να εκτελεστεί απευθείας στο SageMaker Studio ή να εκτελεστεί με κλήση execution = training_pipeline.start(). Το παρακάτω σχήμα απεικονίζει τη ροή του αγωγού.

SageMaker Pipeline DAG

Επιπλέον, μπορούμε να ελέγξουμε τη σειρά των τεχνουργημάτων που δημιουργούνται από την εκτέλεση του αγωγού.

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

Αναπτύξτε το μοντέλο

Αφού το καλύτερο μοντέλο καταχωρηθεί στο SageMaker Model Registry μέσω μιας διοχέτευσης, αναπτύσσουμε το μοντέλο σε ένα τελικό σημείο σε πραγματικό χρόνο, χρησιμοποιώντας τις πλήρως διαχειριζόμενες δυνατότητες ανάπτυξης μοντέλου του SageMaker. Το SageMaker έχει άλλες επιλογές ανάπτυξης μοντέλων για να καλύψει τις ανάγκες διαφορετικών περιπτώσεων χρήσης. Για λεπτομέρειες, ανατρέξτε στο Αναπτύξτε μοντέλα για συμπεράσματα όταν επιλέγετε τη σωστή επιλογή για την περίπτωση χρήσης σας. Αρχικά, ας εγγραφεί το μοντέλο στο μητρώο μοντέλων SageMaker:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

Η τρέχουσα κατάσταση του μοντέλου είναι PendingApproval. Πρέπει να ορίσουμε την κατάστασή του σε Approved πριν από την ανάπτυξη:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

εκκαθάριση

Αφού ολοκληρώσετε τον πειραματισμό, θυμηθείτε να καθαρίσετε τους πόρους για να αποφύγετε περιττές χρεώσεις. Για καθαρισμό, διαγράψτε το τελικό σημείο σε πραγματικό χρόνο, την ομάδα μοντέλων, τη διοχέτευση και την ομάδα χαρακτηριστικών καλώντας τα API DeleteEndpoint, DeleteModelPackageGroup, DeletePipeline, να DeleteFeatureGroup, αντίστοιχα, και τερματίστε όλες τις παρουσίες φορητών υπολογιστών SageMaker Studio.

Συμπέρασμα

Αυτή η ανάρτηση παρουσίασε μια αναλυτική περιγραφή του τρόπου χρήσης του SageMaker Pipelines για την ενορχήστρωση ροών εργασίας ML που βασίζονται σε Ray. Επιδείξαμε επίσης την ικανότητα του SageMaker Pipelines να ενσωματώνεται με εργαλεία ML τρίτων. Υπάρχουν διάφορες υπηρεσίες AWS που υποστηρίζουν φόρτους εργασίας Ray με επεκτάσιμο και ασφαλή τρόπο, ώστε να διασφαλίζεται η αριστεία απόδοσης και η λειτουργική αποδοτικότητα. Τώρα, είναι η σειρά σας να εξερευνήσετε αυτές τις ισχυρές δυνατότητες και να αρχίσετε να βελτιστοποιείτε τις ροές εργασιών μηχανικής εκμάθησης με το Amazon SageMaker Pipelines and Ray. Λάβετε δράση σήμερα και ξεκλειδώστε πλήρως τις δυνατότητες των έργων σας ML!


Σχετικά με το Συγγραφέας

Ενορχηστρώστε ροές εργασιών μηχανικής μάθησης που βασίζονται σε Ray χρησιμοποιώντας το Amazon SageMaker | Υπηρεσίες Ιστού Amazon PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.Ράτζου Ράνγκαν είναι Senior Solutions Architect στο Amazon Web Services (AWS). Συνεργάζεται με κρατικές οντότητες, βοηθώντας τους να δημιουργήσουν λύσεις AI/ML χρησιμοποιώντας AWS. Όταν δεν ασχολείστε με τις λύσεις cloud, θα τον πιάσετε να κάνει παρέα με την οικογένεια ή να σπάει πουλάκια σε ένα ζωντανό παιχνίδι μπάντμιντον με φίλους.

Ενορχηστρώστε ροές εργασιών μηχανικής μάθησης που βασίζονται σε Ray χρησιμοποιώντας το Amazon SageMaker | Υπηρεσίες Ιστού Amazon PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.Σέρι Ντινγκ είναι ανώτερος αρχιτέκτονας λύσεων ειδικός σε AI/ML στην Amazon Web Services (AWS). Έχει μεγάλη εμπειρία στη μηχανική μάθηση με διδακτορικό στην επιστήμη των υπολογιστών. Συνεργάζεται κυρίως με πελάτες του δημόσιου τομέα σε διάφορες επιχειρηματικές προκλήσεις που σχετίζονται με το AI/ML, βοηθώντας τους να επιταχύνουν το ταξίδι μηχανικής εκμάθησης στο AWS Cloud. Όταν δεν βοηθά τους πελάτες, απολαμβάνει τις υπαίθριες δραστηριότητες.

Σφραγίδα ώρας:

Περισσότερα από Μηχανική εκμάθηση AWS