Orchestrieren Sie Ray-basierte Arbeitsabläufe für maschinelles Lernen mit Amazon SageMaker | Amazon Web Services

Orchestrieren Sie Ray-basierte Arbeitsabläufe für maschinelles Lernen mit Amazon SageMaker | Amazon Web Services

Maschinelles Lernen (ML) wird immer komplexer, da Kunden versuchen, immer anspruchsvollere Probleme zu lösen. Diese Komplexität führt häufig dazu, dass verteiltes ML erforderlich ist, bei dem mehrere Maschinen zum Trainieren eines einzelnen Modells verwendet werden. Obwohl dies eine Parallelisierung von Aufgaben über mehrere Knoten hinweg ermöglicht, was zu kürzeren Trainingszeiten, verbesserter Skalierbarkeit und verbesserter Leistung führt, gibt es erhebliche Herausforderungen bei der effektiven Nutzung verteilter Hardware. Datenwissenschaftler müssen sich Herausforderungen wie Datenpartitionierung, Lastausgleich, Fehlertoleranz und Skalierbarkeit stellen. ML-Ingenieure müssen Parallelisierung, Planung, Fehler und Wiederholungsversuche manuell handhaben, was einen komplexen Infrastrukturcode erfordert.

In diesem Beitrag diskutieren wir die Vorteile der Verwendung Strahl und Amazon Sage Maker für verteiltes ML und bieten eine Schritt-für-Schritt-Anleitung zur Verwendung dieser Frameworks zum Erstellen und Bereitstellen eines skalierbaren ML-Workflows.

Ray, ein Open-Source-Framework für verteiltes Computing, bietet ein flexibles Framework für verteiltes Training und Bereitstellung von ML-Modellen. Es abstrahiert Details verteilter Systeme auf niedriger Ebene durch einfache, skalierbare Bibliotheken für gängige ML-Aufgaben wie Datenvorverarbeitung, verteiltes Training, Hyperparameter-Tuning, Reinforcement Learning und Modellbereitstellung.

SageMaker ist ein vollständig verwalteter Dienst zum Erstellen, Trainieren und Bereitstellen von ML-Modellen. Ray lässt sich nahtlos in die SageMaker-Funktionen integrieren, um komplexe ML-Workloads zu erstellen und bereitzustellen, die sowohl effizient als auch zuverlässig sind. Die Kombination von Ray und SageMaker bietet End-to-End-Funktionen für skalierbare ML-Workflows und verfügt über die folgenden hervorgehobenen Funktionen:

  • Verteilte Akteure und Parallelitätskonstrukte in Ray vereinfachen die Entwicklung verteilter Anwendungen.
  • Ray AI Runtime (AIR) reduziert die Reibung beim Übergang von der Entwicklung zur Produktion. Mit Ray und AIR kann derselbe Python-Code nahtlos von einem Laptop auf einen großen Cluster skaliert werden.
  • Die verwaltete Infrastruktur von SageMaker und Funktionen wie Verarbeitungsjobs, Trainingsjobs und Hyperparameter-Tuning-Jobs können die darunter liegenden Ray-Bibliotheken für verteiltes Computing nutzen.
  • Amazon SageMaker-Experimente ermöglicht die schnelle Iteration und Verfolgung von Versuchen.
  • Amazon SageMaker Feature Store Bietet ein skalierbares Repository zum Speichern, Abrufen und Teilen von ML-Funktionen für das Modelltraining.
  • Trainierte Modelle können gespeichert, versioniert und nachverfolgt werden Amazon SageMaker-Modellregistrierung für Governance und Management.
  • Amazon SageMaker-Pipelines ermöglicht die Orchestrierung des End-to-End-ML-Lebenszyklus von der Datenvorbereitung und Schulung bis hin zur Modellbereitstellung als automatisierte Workflows.

Lösungsüberblick

Dieser Beitrag konzentriert sich auf die Vorteile der gemeinsamen Verwendung von Ray und SageMaker. Wir haben einen durchgängigen Ray-basierten ML-Workflow eingerichtet, der mit SageMaker Pipelines orchestriert wurde. Der Workflow umfasst die parallele Aufnahme von Daten in den Feature Store mithilfe von Ray-Akteuren, die Datenvorverarbeitung mit Ray Data, das Trainieren von Modellen und Hyperparameter-Tuning im großen Maßstab mithilfe von Ray Train- und Hyperparameter-Optimierungs-Tuning-Jobs (HPO) sowie schließlich die Modellbewertung und Registrierung des Modells in einem Modellregister.

Für unsere Daten verwenden wir ein synthetischer Wohnungsdatensatz das aus acht Features besteht (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH und DECK) und unser Modell wird das vorhersagen PRICE des Hauses.

Jede Phase im ML-Workflow ist in einzelne Schritte unterteilt, mit einem eigenen Skript, das Eingabe- und Ausgabeparameter entgegennimmt. Im nächsten Abschnitt heben wir wichtige Codeausschnitte aus jedem Schritt hervor. Den vollständigen Code finden Sie auf der aws-samples-for-ray GitHub-Repository.

Voraussetzungen:

Um das SageMaker Python SDK zu verwenden und den mit diesem Beitrag verknüpften Code auszuführen, benötigen Sie die folgenden Voraussetzungen:

Nehmen Sie Daten in den SageMaker Feature Store auf

Der erste Schritt im ML-Workflow besteht darin, die Quelldatendatei zu lesen Amazon Simple Storage-Service (Amazon S3) im CSV-Format und laden Sie es in den SageMaker Feature Store ein. Der SageMaker Feature Store ist ein speziell entwickeltes Repository, das es Teams erleichtert, ML-Funktionen zu erstellen, zu teilen und zu verwalten. Es vereinfacht die Erkennung, Wiederverwendung und gemeinsame Nutzung von Funktionen und führt zu einer schnelleren Entwicklung, einer verbesserten Zusammenarbeit innerhalb der Kundenteams und geringeren Kosten.

Das Aufnehmen von Features in den Feature Store umfasst die folgenden Schritte:

  1. Definieren Sie eine Feature-Gruppe und erstellen Sie die Feature-Gruppe im Feature Store.
  2. Bereiten Sie die Quelldaten für den Feature Store vor, indem Sie für jede Datenzeile eine Ereigniszeit und eine Datensatz-ID hinzufügen.
  3. Nehmen Sie die vorbereiteten Daten mithilfe des Boto3 SDK in die Funktionsgruppe auf.

In diesem Abschnitt heben wir nur Schritt 3 hervor, da dies der Teil ist, der die parallele Verarbeitung der Aufnahmeaufgabe mithilfe von Ray beinhaltet. Den vollständigen Code für diesen Vorgang finden Sie im GitHub Repo.

Das ingest_features Die Methode wird innerhalb einer aufgerufenen Klasse definiert Featurestore. Notiere dass der Featurestore Klasse ist geschmückt mit @ray.remote. Dies weist darauf hin, dass eine Instanz dieser Klasse ein Ray-Akteur ist, eine zustandsbehaftete und gleichzeitige Recheneinheit innerhalb von Ray. Es handelt sich um ein Programmiermodell, mit dem Sie verteilte Objekte erstellen können, die einen internen Status beibehalten und auf die mehrere Aufgaben gleichzeitig zugreifen können, die auf verschiedenen Knoten in einem Ray-Cluster ausgeführt werden. Aktoren bieten eine Möglichkeit, den veränderlichen Zustand zu verwalten und zu kapseln, was sie für die Erstellung komplexer, zustandsbehafteter Anwendungen in einer verteilten Umgebung wertvoll macht. Sie können Ressourcenanforderungen auch in Akteuren angeben. In diesem Fall ist jede Instanz von FeatureStore Die Klasse benötigt 0.5 CPUs. Siehe den folgenden Code:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

Sie können mit dem Schauspieler interagieren, indem Sie anrufen remote Operator. Im folgenden Code wird die gewünschte Anzahl an Akteuren als Eingabeargument an das Skript übergeben. Die Daten werden dann basierend auf der Anzahl der Akteure partitioniert und an die Remote-Parallelprozesse weitergeleitet, um in den Feature Store aufgenommen zu werden. Du kannst anrufen get auf die Objektreferenz, um die Ausführung der aktuellen Aufgabe zu blockieren, bis die Remote-Berechnung abgeschlossen ist und das Ergebnis verfügbar ist. Sobald das Ergebnis vorliegt, ray.get gibt das Ergebnis zurück und die Ausführung der aktuellen Aufgabe wird fortgesetzt.

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

Bereiten Sie Daten für Schulung, Validierung und Tests vor

In diesem Schritt verwenden wir Ray Dataset, um unseren Datensatz als Vorbereitung für maschinelles Lernen effizient aufzuteilen, zu transformieren und zu skalieren. Ray Dataset bietet eine Standardmethode zum Laden verteilter Daten in Ray und unterstützt verschiedene Speichersysteme und Dateiformate. Es verfügt über APIs für gängige ML-Datenvorverarbeitungsvorgänge wie parallele Transformationen, Mischen, Gruppieren und Aggregationen. Ray Dataset übernimmt auch Vorgänge, die eine zustandsbehaftete Einrichtung und GPU-Beschleunigung erfordern. Es lässt sich problemlos in andere Datenverarbeitungsbibliotheken wie Spark, Pandas, NumPy und mehr sowie in ML-Frameworks wie TensorFlow und PyTorch integrieren. Dies ermöglicht den Aufbau von End-to-End-Datenpipelines und ML-Workflows auf Ray. Ziel ist es, die verteilte Datenverarbeitung und ML für Praktiker und Forscher einfacher zu machen.

Schauen wir uns Abschnitte der Skripte an, die diese Datenvorverarbeitung durchführen. Wir beginnen mit dem Laden der Daten aus dem Feature Store:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

Anschließend teilen und skalieren wir die Daten mithilfe der Abstraktionen auf höherer Ebene, die im verfügbar sind ray.data Bibliothek:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

Die verarbeiteten Zug-, Validierungs- und Testdatensätze werden in Amazon S3 gespeichert und als Eingabeparameter an nachfolgende Schritte übergeben.

Führen Sie Modelltraining und Hyperparameteroptimierung durch

Nachdem unsere Daten vorverarbeitet und für die Modellierung bereit sind, ist es an der Zeit, einige ML-Modelle zu trainieren und ihre Hyperparameter zu optimieren, um die Vorhersageleistung zu maximieren. Wir gebrauchen XGBoost-Ray, ein auf Ray basierendes verteiltes Backend für XGBoost, das das Training von XGBoost-Modellen auf großen Datensätzen durch die Verwendung mehrerer Knoten und GPUs ermöglicht. Es bietet einfachen Ersatz für die Trainings- und Vorhersage-APIs von XGBoost und bewältigt gleichzeitig die Komplexität der verteilten Datenverwaltung und des Trainings unter der Haube.

Um die Verteilung des Trainings auf mehrere Knoten zu ermöglichen, verwenden wir eine Hilfsklasse namens RayHelper. Wie im folgenden Code gezeigt, verwenden wir die Ressourcenkonfiguration des Trainingsjobs und wählen den ersten Host als Hauptknoten:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

Wir können die Host-Informationen verwenden, um zu entscheiden, wie Ray für jede der Trainingsjob-Instanzen initialisiert wird:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

Wenn ein Trainingsjob gestartet wird, kann ein Ray-Cluster durch Aufrufen von initialisiert werden start_ray() -Methode für eine Instanz von RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

Zum Training nutzen wir dann den XGBoost-Trainer von XGBoost-Ray:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

Beachten Sie, dass beim Instanziieren des trainer, wir passieren RayParams, wobei die Anzahl der Schauspieler und die Anzahl der CPUs pro Schauspieler berücksichtigt werden. XGBoost-Ray verwendet diese Informationen, um das Training auf alle mit dem Ray-Cluster verbundenen Knoten zu verteilen.

Wir erstellen nun ein XGBoost-Schätzobjekt basierend auf dem SageMaker Python SDK und verwenden dieses für den HPO-Job.

Orchestrieren Sie die vorherigen Schritte mit SageMaker Pipelines

Um einen durchgängig skalierbaren und wiederverwendbaren ML-Workflow aufzubauen, müssen wir ein CI/CD-Tool verwenden, um die vorherigen Schritte in einer Pipeline zu orchestrieren. SageMaker Pipelines verfügt über eine direkte Integration mit SageMaker, dem SageMaker Python SDK und SageMaker Studio. Durch diese Integration können Sie ML-Workflows mit einem benutzerfreundlichen Python SDK erstellen und Ihren Workflow dann mit SageMaker Studio visualisieren und verwalten. Sie können auch den Verlauf Ihrer Daten innerhalb der Pipeline-Ausführung verfolgen und Schritte für die Zwischenspeicherung festlegen.

SageMaker Pipelines erstellt einen gerichteten azyklischen Graphen (DAG), der die zum Aufbau eines ML-Workflows erforderlichen Schritte enthält. Jede Pipeline besteht aus einer Reihe miteinander verbundener Schritte, die durch Datenabhängigkeiten zwischen den Schritten orchestriert werden, und kann parametrisiert werden, sodass Sie Eingabevariablen als Parameter für jeden Lauf der Pipeline bereitstellen können. SageMaker Pipelines verfügt über vier Arten von Pipeline-Parametern: ParameterString, ParameterInteger, ParameterFloat und ParameterBoolean. In diesem Abschnitt parametrisieren wir einige der Eingabevariablen und richten die Schritt-Caching-Konfiguration ein:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

Wir definieren zwei Verarbeitungsschritte: einen für die Aufnahme des SageMaker Feature Store, den anderen für die Datenvorbereitung. Dies sollte den zuvor beschriebenen Schritten sehr ähnlich sein. Die einzige neue Codezeile ist die ProcessingStep nach der Definition der Schritte, was es uns ermöglicht, die Konfiguration des Verarbeitungsauftrags zu übernehmen und sie als Pipeline-Schritt einzubinden. Wir spezifizieren außerdem die Abhängigkeit des Datenvorbereitungsschritts vom SageMaker Feature Store-Aufnahmeschritt. Siehe den folgenden Code:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

Ebenso müssen wir zum Erstellen eines Modellschulungs- und Optimierungsschritts eine Definition von hinzufügen TuningStep nach dem Code des Modelltrainingsschritts, damit wir die SageMaker-Hyperparameteroptimierung als Schritt in der Pipeline ausführen können:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

Nach dem Optimierungsschritt entscheiden wir uns, das beste Modell in der SageMaker Model Registry zu registrieren. Um die Modellqualität zu kontrollieren, implementieren wir ein Minimum Quality Gate, das die objektive Metrik (RMSE) des besten Modells mit einem Schwellenwert vergleicht, der als Eingabeparameter der Pipeline definiert ist rmse_threshold. Um diese Auswertung durchzuführen, erstellen wir einen weiteren Verarbeitungsschritt, um einen auszuführen Bewertungsskript. Das Modellbewertungsergebnis wird als Eigenschaftendatei gespeichert. Eigenschaftendateien sind besonders nützlich, wenn Sie die Ergebnisse eines Verarbeitungsschritts analysieren, um zu entscheiden, wie andere Schritte ausgeführt werden sollen. Siehe den folgenden Code:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

Wir definieren a ModelStep um das beste Modell in unserer Pipeline in der SageMaker Model Registry zu registrieren. Für den Fall, dass das beste Modell unsere vorgegebene Qualitätsprüfung nicht besteht, geben wir zusätzlich eine FailStep um eine Fehlermeldung auszugeben:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

Als nächstes verwenden wir a ConditionStep um zu bewerten, ob der Modellregistrierungsschritt oder der Fehlerschritt als nächstes in der Pipeline durchgeführt werden sollte. In unserem Fall wird das beste Modell registriert, wenn sein RMSE-Score unter dem Schwellenwert liegt.

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

Abschließend orchestrieren wir alle definierten Schritte in einer Pipeline:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

Die vorangehende Pipeline kann direkt in SageMaker Studio visualisiert und ausgeführt oder durch Aufruf ausgeführt werden execution = training_pipeline.start(). Die folgende Abbildung veranschaulicht den Pipeline-Fluss.

SageMaker-Pipeline-DAG

Darüber hinaus können wir die Herkunft der durch die Pipeline-Ausführung generierten Artefakte überprüfen.

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

Stellen Sie das Modell bereit

Nachdem das beste Modell über einen Pipeline-Lauf in der SageMaker Model Registry registriert wurde, stellen wir das Modell mithilfe der vollständig verwalteten Modellbereitstellungsfunktionen von SageMaker auf einem Echtzeit-Endpunkt bereit. SageMaker verfügt über weitere Modellbereitstellungsoptionen, um den Anforderungen verschiedener Anwendungsfälle gerecht zu werden. Einzelheiten finden Sie unter Stellen Sie Modelle zur Inferenz bereit bei der Auswahl der richtigen Option für Ihren Anwendungsfall. Lassen Sie uns zunächst das Modell in der SageMaker-Modellregistrierung registrieren:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

Der aktuelle Status des Modells ist PendingApproval. Wir müssen seinen Status auf setzen Approved vor dem Einsatz:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

Aufräumen

Denken Sie nach dem Experimentieren daran, die Ressourcen aufzuräumen, um unnötige Kosten zu vermeiden. Löschen Sie zum Bereinigen den Echtzeitendpunkt, die Modellgruppe, die Pipeline und die Featuregruppe, indem Sie die APIs aufrufen Endpunkt löschen, DeleteModelPackageGroup, Pipeline löschen und FeatureGroup löschen, und fahren Sie alle SageMaker Studio-Notebook-Instanzen herunter.

Zusammenfassung

Dieser Beitrag zeigt eine schrittweise Anleitung zur Verwendung von SageMaker Pipelines zur Orchestrierung Ray-basierter ML-Workflows. Wir haben auch die Fähigkeit von SageMaker Pipelines demonstriert, sich in ML-Tools von Drittanbietern zu integrieren. Es gibt verschiedene AWS-Services, die Ray-Workloads auf skalierbare und sichere Weise unterstützen, um hervorragende Leistung und betriebliche Effizienz zu gewährleisten. Jetzt sind Sie an der Reihe, diese leistungsstarken Funktionen zu erkunden und mit der Optimierung Ihrer Machine-Learning-Workflows mit Amazon SageMaker Pipelines und Ray zu beginnen. Werden Sie noch heute aktiv und schöpfen Sie das volle Potenzial Ihrer ML-Projekte aus!


Über den Autor

Orchestrieren Sie Ray-basierte Arbeitsabläufe für maschinelles Lernen mit Amazon SageMaker | Amazon Web Services PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.Raju Rangan ist Senior Solutions Architect bei Amazon Web Services (AWS). Er arbeitet mit staatlich geförderten Einrichtungen zusammen und hilft ihnen beim Aufbau von KI/ML-Lösungen mithilfe von AWS. Wenn er nicht gerade an Cloud-Lösungen bastelt, erwischt man ihn dabei, wie er Zeit mit der Familie verbringt oder mit Freunden bei einer lebhaften Badmintonpartie Birdies schlägt.

Orchestrieren Sie Ray-basierte Arbeitsabläufe für maschinelles Lernen mit Amazon SageMaker | Amazon Web Services PlatoBlockchain Data Intelligence. Vertikale Suche. Ai.Sherry Ding ist ein leitender KI/ML-Spezialist für Lösungsarchitekten bei Amazon Web Services (AWS). Sie verfügt über umfangreiche Erfahrung im maschinellen Lernen und hat einen Doktortitel in Informatik. Sie arbeitet hauptsächlich mit Kunden aus dem öffentlichen Sektor an verschiedenen geschäftlichen Herausforderungen im Zusammenhang mit KI/ML und hilft ihnen, ihre Reise zum maschinellen Lernen in der AWS Cloud zu beschleunigen. Wenn sie nicht gerade Kunden betreut, genießt sie Outdoor-Aktivitäten.

Zeitstempel:

Mehr von AWS Maschinelles Lernen