Organizuj przepływy pracy oparte na uczeniu maszynowym Ray za pomocą Amazon SageMaker | Usługi internetowe Amazona

Organizuj przepływy pracy oparte na uczeniu maszynowym Ray za pomocą Amazon SageMaker | Usługi internetowe Amazona

Uczenie maszynowe (ML) staje się coraz bardziej złożone, ponieważ klienci próbują rozwiązywać coraz trudniejsze problemy. Ta złożoność często prowadzi do konieczności rozproszonego uczenia maszynowego, w którym do uczenia jednego modelu używanych jest wiele maszyn. Chociaż umożliwia to równoległość zadań w wielu węzłach, co prowadzi do skrócenia czasu szkolenia, zwiększonej skalowalności i lepszej wydajności, efektywne wykorzystanie rozproszonego sprzętu wiąże się z poważnymi wyzwaniami. Analitycy danych muszą stawić czoła wyzwaniom, takim jak partycjonowanie danych, równoważenie obciążenia, odporność na awarie i skalowalność. Inżynierowie ML muszą ręcznie obsługiwać równoległość, planowanie, błędy i ponowne próby, co wymaga złożonego kodu infrastruktury.

W tym poście omawiamy korzyści płynące z używania Ray Wilson i Amazon Sage Maker dla rozproszonego uczenia maszynowego i zapewniają przewodnik krok po kroku dotyczący korzystania z tych struktur do tworzenia i wdrażania skalowalnego przepływu pracy ML.

Ray, platforma do przetwarzania rozproszonego typu open source, zapewnia elastyczną platformę do rozproszonego szkolenia i obsługi modeli uczenia maszynowego. Abstrahuje szczegóły systemu rozproszonego niskiego poziomu za pomocą prostych, skalowalnych bibliotek do typowych zadań ML, takich jak wstępne przetwarzanie danych, szkolenie rozproszone, dostrajanie hiperparametrów, uczenie się przez wzmacnianie i udostępnianie modeli.

SageMaker to w pełni zarządzana usługa służąca do tworzenia, szkolenia i wdrażania modeli uczenia maszynowego. Ray bezproblemowo integruje się z funkcjami SageMaker, aby tworzyć i wdrażać złożone obciążenia ML, które są zarówno wydajne, jak i niezawodne. Połączenie Ray i SageMaker zapewnia kompleksowe możliwości skalowalnych przepływów pracy ML i ma następujące wyróżnione funkcje:

  • Rozproszeni aktorzy i konstrukcje równoległości w Ray upraszczają tworzenie aplikacji rozproszonych.
  • Ray AI Runtime (AIR) zmniejsza trudności związane z przechodzeniem od etapu rozwoju do produkcji. Dzięki Ray i AIR ten sam kod Pythona można bezproblemowo skalować z laptopa do dużego klastra.
  • Zarządzana infrastruktura SageMaker i funkcje takie jak zadania przetwarzania, zadania szkoleniowe i zadania dostrajania hiperparametrów mogą wykorzystywać znajdujące się pod nią biblioteki Ray do przetwarzania rozproszonego.
  • Eksperymenty Amazon SageMaker umożliwia szybkie iterowanie i śledzenie prób.
  • Sklep funkcji Amazon SageMaker zapewnia skalowalne repozytorium do przechowywania, pobierania i udostępniania funkcji ML na potrzeby uczenia modeli.
  • Wytrenowane modele można przechowywać, wersjonować i śledzić Rejestr modelu Amazon SageMaker dla rządzenia i zarządzania.
  • Rurociągi Amazon SageMaker umożliwia koordynację kompleksowego cyklu życia uczenia maszynowego, od przygotowania danych i szkolenia po modelowanie wdrażania w postaci zautomatyzowanych przepływów pracy.

Omówienie rozwiązania

W tym poście skupiono się na korzyściach płynących ze wspólnego używania Ray i SageMaker. Skonfigurowaliśmy kompleksowy przepływ pracy oparty na technologii Ray, zorganizowany przy użyciu SageMaker Pipelines. Przepływ pracy obejmuje równoległe pozyskiwanie danych do magazynu funkcji przy użyciu aktorów Raya, wstępne przetwarzanie danych za pomocą Ray Data, modele szkoleniowe i dostrajanie hiperparametrów na dużą skalę przy użyciu Ray Train i zadań dostrajania optymalizacji hiperparametrów (HPO), a na koniec ocenę modelu i rejestrację modelu w rejestr modeli.

Do naszych danych używamy syntetyczny zbiór danych mieszkaniowych składający się z ośmiu funkcji (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH, DECK), a nasz model będzie przewidywał PRICE domu.

Każdy etap przepływu pracy ML jest podzielony na oddzielne kroki z własnym skryptem, który pobiera parametry wejściowe i wyjściowe. W następnej sekcji wyróżniamy kluczowe fragmenty kodu z każdego kroku. Pełny kod można znaleźć na stronie aws-samples-for-ray repozytorium GitHub.

Wymagania wstępne

Aby skorzystać z zestawu SDK języka Python SageMaker i uruchomić kod powiązany z tym postem, potrzebne są następujące wymagania wstępne:

Pozyskuj dane do sklepu z funkcjami SageMaker

Pierwszym krokiem w przepływie pracy ML jest odczytanie źródłowego pliku danych Usługa Amazon Simple Storage (Amazon S3) w formacie CSV i pobierz go w sklepie z funkcjami SageMaker. SageMaker Feature Store to specjalnie zaprojektowane repozytorium, które ułatwia zespołom tworzenie, udostępnianie i zarządzanie funkcjami ML. Upraszcza odkrywanie, ponowne wykorzystywanie i udostępnianie funkcji, co prowadzi do szybszego programowania, lepszej współpracy w zespołach klientów i obniżonych kosztów.

Pozyskiwanie funkcji do magazynu funkcji obejmuje następujące kroki:

  1. Zdefiniuj grupę funkcji i utwórz ją w magazynie funkcji.
  2. Przygotuj dane źródłowe dla magazynu funkcji, dodając czas zdarzenia i identyfikator rekordu dla każdego wiersza danych.
  3. Pozyskuj przygotowane dane do grupy funkcji przy użyciu zestawu SDK Boto3.

W tej sekcji wyróżniamy tylko krok 3, ponieważ jest to część obejmująca równoległe przetwarzanie zadania pozyskiwania za pomocą Ray. Możesz przejrzeć pełny kod tego procesu w pliku GitHub repo.

Połączenia funkcje spożycia metoda jest zdefiniowana wewnątrz klasy o nazwie Featurestore. Zauważ, że Featurestore klasa jest udekorowana @ray.remote. Oznacza to, że instancją tej klasy jest aktor Ray, stanowa i współbieżna jednostka obliczeniowa w Ray. Jest to model programowania, który umożliwia tworzenie obiektów rozproszonych, które zachowują stan wewnętrzny i do których można uzyskać dostęp jednocześnie przez wiele zadań uruchomionych w różnych węzłach klastra Ray. Aktorzy umożliwiają zarządzanie zmiennym stanem i hermetyzację go, dzięki czemu są cenni przy tworzeniu złożonych, stanowych aplikacji w środowisku rozproszonym. Możesz także określić wymagania dotyczące zasobów w aktorach. W tym przypadku każda instancja FeatureStore class będzie wymagać 0.5 procesora. Zobacz następujący kod:

@ray.remote(num_cpus=0.5)
class Featurestore: def ingest_features(self,feature_group_name, df, region): """ Ingest features to Feature Store Group Args: feature_group_name (str): Feature Group Name data_path (str): Path to the train/validation/test data in CSV format. """ ...

Możesz wchodzić w interakcję z aktorem, dzwoniąc pod numer remote operator. W poniższym kodzie żądana liczba aktorów jest przekazywana jako argument wejściowy do skryptu. Dane są następnie dzielone na partycje w oparciu o liczbę aktorów i przekazywane do zdalnych procesów równoległych w celu ich wykorzystania w magazynie funkcji. Możesz zadzwonić get na obiekcie ref, aby zablokować wykonanie bieżącego zadania do czasu zakończenia zdalnych obliczeń i udostępnienia wyniku. Gdy wynik będzie dostępny, ray.get zwróci wynik, a wykonywanie bieżącego zadania będzie kontynuowane.

import modin.pandas as pd
import ray df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = [] for actor, partition in zip(actors, input_partitions): results.append(actor.ingest_features.remote( args.feature_group_name, partition, args.region ) ) ray.get(results)

Przygotuj dane do szkolenia, walidacji i testowania

Na tym etapie używamy zestawu danych Ray do wydajnego dzielenia, przekształcania i skalowania naszego zestawu danych w ramach przygotowań do uczenia maszynowego. Ray Dataset zapewnia standardowy sposób ładowania rozproszonych danych do Ray, obsługując różne systemy przechowywania i formaty plików. Posiada interfejsy API do typowych operacji wstępnego przetwarzania danych ML, takich jak transformacje równoległe, tasowanie, grupowanie i agregacje. Ray Dataset obsługuje również operacje wymagające konfiguracji stanowej i akceleracji GPU. Płynnie integruje się z innymi bibliotekami przetwarzania danych, takimi jak Spark, Pandas, NumPy i innymi, a także platformami ML, takimi jak TensorFlow i PyTorch. Umożliwia to budowanie kompleksowych potoków danych i przepływów pracy ML w oparciu o Ray. Celem jest ułatwienie praktykom i badaczom rozproszonego przetwarzania danych i uczenia maszynowego.

Przyjrzyjmy się sekcję skryptów, które wykonują wstępne przetwarzanie danych. Zaczynamy od załadowania danych ze sklepu z funkcjami:

def load_dataset(feature_group_name, region): """ Loads the data as a ray dataset from the offline featurestore S3 location Args: feature_group_name (str): name of the feature group Returns: ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store """ session = sagemaker.Session(boto3.Session(region_name=region)) fs_group = FeatureGroup( name=feature_group_name, sagemaker_session=session ) fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri") # Drop columns added by the feature store # Since these are not related to the ML problem at hand cols_to_drop = ["record_id", "event_time","write_time", "api_invocation_time", "is_deleted", "year", "month", "day", "hour"] ds = ray.data.read_parquet(fs_data_loc) ds = ds.drop_columns(cols_to_drop) print(f"{fs_data_loc} count is {ds.count()}") return ds

Następnie dzielimy i skalujemy dane, korzystając z abstrakcji wyższego poziomu dostępnych w witrynie ray.data biblioteka:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None): """ Split dataset into train, validation and test samples Args: dataset (ray.data.Dataset): input data train_size (float): ratio of data to use as training dataset val_size (float): ratio of data to use as validation dataset test_size (float): ratio of data to use as test dataset random_state (int): Pass an int for reproducible output across multiple function calls. Returns: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset """ # Shuffle this dataset with a fixed random seed. shuffled_ds = dataset.random_shuffle(seed=random_state) # Split the data into train, validation and test datasets train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size]) return train_set, val_set, test_set def scale_dataset(train_set, val_set, test_set, target_col): """ Fit StandardScaler to train_set and apply it to val_set and test_set Args: train_set (ray.data.Dataset): train dataset val_set (ray.data.Dataset): validation dataset test_set (ray.data.Dataset): test dataset target_col (str): target col Returns: train_transformed (ray.data.Dataset): train data scaled val_transformed (ray.data.Dataset): val data scaled test_transformed (ray.data.Dataset): test data scaled """ tranform_cols = dataset.columns() # Remove the target columns from being scaled tranform_cols.remove(target_col) # set up a standard scaler standard_scaler = StandardScaler(tranform_cols) # fit scaler to training dataset print("Fitting scaling to training data and transforming dataset...") train_set_transformed = standard_scaler.fit_transform(train_set) # apply scaler to validation and test datasets print("Transforming validation and test datasets...") val_set_transformed = standard_scaler.transform(val_set) test_set_transformed = standard_scaler.transform(test_set) return train_set_transformed, val_set_transformed, test_set_transformed

Przetworzone zestawy danych pociągowych, walidacyjnych i testowych są przechowywane w Amazon S3 i będą przekazywane jako parametry wejściowe do kolejnych kroków.

Przeprowadź szkolenie modelu i optymalizację hiperparametrów

Ponieważ nasze dane są wstępnie przetworzone i gotowe do modelowania, nadszedł czas na przeszkolenie niektórych modeli uczenia maszynowego i dostrojenie ich hiperparametrów, aby zmaksymalizować wydajność predykcyjną. Używamy XGBoost-Promień, rozproszony backend dla XGBoost oparty na Ray, który umożliwia trenowanie modeli XGBoost na dużych zbiorach danych przy użyciu wielu węzłów i procesorów graficznych. Zapewnia proste, natychmiastowe zamienniki dla pociągów i przewidywań interfejsów API XGBoost, jednocześnie radząc sobie ze złożonością rozproszonego zarządzania danymi i szkolenia pod maską.

Aby umożliwić dystrybucję uczenia na wiele węzłów, używamy klasy pomocniczej o nazwie RayHelper. Jak pokazano w poniższym kodzie, używamy konfiguracji zasobów zadania szkoleniowego i wybieramy pierwszego hosta jako węzeł główny:

class RayHelper(): def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"): .... self.resource_config = self.get_resource_config() self.head_host = self.resource_config["hosts"][0] self.n_hosts = len(self.resource_config["hosts"])

Możemy użyć informacji o hoście, aby zdecydować, jak zainicjować Ray w każdej instancji zadania szkoleniowego:

def start_ray(self): head_ip = self._get_ip_from_host() # If the current host is the host choosen as the head node # run `ray start` with specifying the --head flag making this is the head node if self.resource_config["current_host"] == self.head_host: output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', self.ray_port, '--redis-password', self.redis_pass, '--include-dashboard', 'false'], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) ray.init(address="auto", include_dashboard=False) self._wait_for_workers() print("All workers present and accounted for") print(ray.cluster_resources()) else: # If the current host is not the head node, # run `ray start` with specifying ip address as the head_host as the head node time.sleep(10) output = subprocess.run(['ray', 'start', f"--address={head_ip}:{self.ray_port}", '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE) print(output.stdout.decode("utf-8")) sys.exit(0)

Po rozpoczęciu zadania szkoleniowego klaster Ray można zainicjować, wywołując metodę start_ray() metoda na instancji RayHelper:

if __name__ == '__main__': ray_helper = RayHelper() ray_helper.start_ray() args = read_parameters() sess = sagemaker.Session(boto3.Session(region_name=args.region))

Następnie do treningu używamy trenera XGBoost firmy XGBoost-Ray:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result: """ Creates a XGBoost trainer, train it, and return the result. Args: ds_train (ray.data.dataset): Training dataset ds_val (ray.data.dataset): Validation dataset params (dict): Hyperparameters num_workers (int): number of workers to distribute the training across target_col (str): target column Returns: result (ray.air.result.Result): Result of the training job """ train_set = RayDMatrix(ds_train, 'PRICE') val_set = RayDMatrix(ds_val, 'PRICE') evals_result = {} trainer = train( params=params, dtrain=train_set, evals_result=evals_result, evals=[(val_set, "validation")], verbose_eval=False, num_boost_round=100, ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1), ) output_path=os.path.join(args.model_dir, 'model.xgb') trainer.save_model(output_path) valMAE = evals_result["validation"]["mae"][-1] valRMSE = evals_result["validation"]["rmse"][-1] print('[3] #011validation-mae:{}'.format(valMAE)) print('[4] #011validation-rmse:{}'.format(valRMSE)) local_testing = False try: load_run(sagemaker_session=sess) except: local_testing = True if not local_testing: # Track experiment if using SageMaker Training with load_run(sagemaker_session=sess) as run: run.log_metric('validation-mae', valMAE) run.log_metric('validation-rmse', valRMSE)

Należy pamiętać, że podczas tworzenia instancji trainer, zdajemy RayParams, który przyjmuje liczbę aktorów i liczbę procesorów na aktorów. XGBoost-Ray wykorzystuje te informacje do dystrybucji uczenia pomiędzy wszystkimi węzłami dołączonymi do klastra Ray.

Tworzymy teraz obiekt estymatora XGBoost w oparciu o pakiet SageMaker Python SDK i używamy go do zadania HPO.

Zorganizuj poprzednie kroki, korzystając z SageMaker Pipelines

Aby zbudować kompleksowy, skalowalny i nadający się do ponownego użycia przepływ pracy ML, musimy użyć narzędzia CI/CD do zorganizowania poprzednich kroków w potok. SageMaker Pipelines ma bezpośrednią integrację z SageMaker, pakietem SageMaker Python SDK i SageMaker Studio. Ta integracja umożliwia tworzenie przepływów pracy ML za pomocą łatwego w użyciu zestawu SDK języka Python, a następnie wizualizację przepływu pracy i zarządzanie nim za pomocą SageMaker Studio. Możesz także śledzić historię swoich danych w ramach wykonywania potoku i wyznaczać etapy buforowania.

SageMaker Pipelines tworzy ukierunkowany wykres acykliczny (DAG), który zawiera kroki niezbędne do zbudowania przepływu pracy ML. Każdy potok to seria połączonych ze sobą kroków uporządkowanych według zależności danych między krokami i można go parametryzować, co pozwala na podanie zmiennych wejściowych jako parametrów dla każdego przebiegu potoku. SageMaker Pipelines ma cztery typy parametrów potoku: ParameterString, ParameterInteger, ParameterFloat, ParameterBoolean. W tej sekcji parametryzujemy niektóre zmienne wejściowe i konfigurujemy konfigurację buforowania kroków:

processing_instance_count = ParameterInteger( name='ProcessingInstanceCount', default_value=1
)
feature_group_name = ParameterString( name='FeatureGroupName', default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString( name='Bucket_Prefix', default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0) train_size = ParameterString( name='TrainSize', default_value="0.6"
)
val_size = ParameterString( name='ValidationSize', default_value="0.2"
)
test_size = ParameterString( name='TestSize', default_value="0.2"
) cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

Definiujemy dwa etapy przetwarzania: jeden do wykorzystania magazynu funkcji SageMaker, drugi do przygotowania danych. Powinno to wyglądać bardzo podobnie do poprzednich kroków opisanych wcześniej. Jedyną nową linijką kodu jest ProcessingStep po definicji kroków, co pozwala nam przyjąć konfigurację zadania przetwarzania i uwzględnić ją jako krok potoku. Następnie określamy zależność etapu przygotowania danych od etapu przetwarzania magazynu funkcji SageMaker. Zobacz następujący kod:

feature_store_ingestion_step = ProcessingStep( name='FeatureStoreIngestion', step_args=fs_processor_args, cache_config=cache_config
) preprocess_dataset_step = ProcessingStep( name='PreprocessData', step_args=processor_args, cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

Podobnie, aby zbudować etap uczenia i dostrajania modelu, musimy dodać definicję TuningStep po kodzie kroku szkolenia modelu, co umożliwi nam uruchomienie dostrajania hiperparametrów SageMaker w ramach etapu potoku:

tuning_step = TuningStep( name="HPTuning", tuner=tuner, inputs={ "train": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

Po etapie dostrajania decydujemy się zarejestrować najlepszy model w rejestrze modeli SageMaker. Aby kontrolować jakość modelu, wdrażamy bramkę minimalnej jakości, która porównuje obiektywną metrykę najlepszego modelu (RMSE) z progiem zdefiniowanym jako parametr wejściowy potoku rmse_threshold. Aby przeprowadzić tę ocenę, tworzymy kolejny etap przetwarzania, aby uruchomić plik skrypt ewaluacyjny. Wynik oceny modelu zostanie zapisany jako plik właściwości. Pliki właściwości są szczególnie przydatne podczas analizowania wyników etapu przetwarzania w celu podjęcia decyzji, w jaki sposób należy uruchomić inne kroki. Zobacz następujący kod:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile( name='EvaluationReport', output_name='evaluation', path='evaluation.json',
) # A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. # In this case, the top performing model is evaluated. evaluation_step = ProcessingStep( name='EvaluateModel', processor=evaluation_processor, inputs=[ ProcessingInput( source=tuning_step.get_top_model_s3_uri( top_k=0, s3_bucket=bucket, prefix=s3_prefix ), destination='/opt/ml/processing/model', ), ProcessingInput( source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, destination='/opt/ml/processing/test', ), ], outputs=[ ProcessingOutput( output_name='evaluation', source='/opt/ml/processing/evaluation' ), ], code='./pipeline_scripts/evaluate/script.py', property_files=[evaluation_report],
)

Definiujemy a ModelStep aby zarejestrować najlepszy model w naszym rejestrze modeli SageMaker. W przypadku, gdy najlepszy model nie przejdzie naszej wcześniej ustalonej kontroli jakości, dodatkowo określamy: FailStep aby wyświetlić komunikat o błędzie:

register_step = ModelStep( name='RegisterTrainedModel', step_args=model_registry_args
) metrics_fail_step = FailStep( name="RMSEFail", error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

Następnie używamy a ConditionStep aby ocenić, czy krok rejestracji modelu, czy krok niepowodzenia powinien być wykonany jako następny w potoku. W naszym przypadku najlepszy model zostanie zarejestrowany, jeśli jego wynik RMSE będzie niższy od progu.

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=evaluation_step.name, property_file=evaluation_report, json_path='regression_metrics.rmse.value', ), right=rmse_threshold,
)
condition_step = ConditionStep( name='CheckEvaluation', conditions=[cond_lte], if_steps=[register_step], else_steps=[metrics_fail_step],
)

Na koniec organizujemy wszystkie zdefiniowane kroki w potok:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [ feature_store_ingestion_step, preprocess_dataset_step, tuning_step, evaluation_step, condition_step ] training_pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, feature_group_name, train_size, val_size, test_size, bucket_prefix, rmse_threshold ], steps=step_list
) # Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

Powyższy potok można wizualizować i wykonywać bezpośrednio w SageMaker Studio lub wykonać poprzez wywołanie execution = training_pipeline.start(). Poniższy rysunek ilustruje przepływ rurociągu.

Rurociąg SageMaker DAG

Dodatkowo możemy przejrzeć pochodzenie artefaktów generowanych przez wykonanie potoku.

from sagemaker.lineage.visualizer import LineageTableVisualizer viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()): print(execution_step) display(viz.show(pipeline_execution_step=execution_step)) time.sleep(5)

Wdróż model

Po zarejestrowaniu najlepszego modelu w rejestrze modeli SageMaker poprzez uruchomienie potoku, wdrażamy model w punkcie końcowym w czasie rzeczywistym, korzystając z w pełni zarządzanych możliwości wdrażania modelu SageMaker. SageMaker oferuje inne opcje wdrażania modelu, aby sprostać potrzebom różnych przypadków użycia. Aby poznać szczegóły, patrz Wdrażaj modele na potrzeby wnioskowania przy wyborze odpowiedniej opcji dla swojego przypadku użycia. Najpierw zarejestrujmy model w rejestrze modeli SageMaker:

xgb_regressor_model = ModelPackage( role_arn, model_package_arn=model_package_arn, name=model_name
)

Obecny stan modelu to PendingApproval. Musimy ustawić jego status na Approved przed wdrożeniem:

sagemaker_client.update_model_package( ModelPackageArn=xgb_regressor_model.model_package_arn, ModelApprovalStatus='Approved'
) xgb_regressor_model.deploy( initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name
)

Sprzątać

Po zakończeniu eksperymentów pamiętaj o oczyszczeniu zasobów, aby uniknąć niepotrzebnych opłat. Aby wyczyścić, usuń punkt końcowy czasu rzeczywistego, grupę modeli, potok i grupę funkcji, wywołując interfejsy API Usuń punkt końcowy, UsuńModelPackageGroup, Usuń potok, Usuń grupę funkcji, i zamknij wszystkie instancje notatników SageMaker Studio.

Wnioski

W tym poście pokazano krok po kroku, jak używać SageMaker Pipelines do organizowania przepływów pracy opartych na Ray. Zademonstrowaliśmy także zdolność SageMaker Pipelines do integracji z narzędziami ML innych firm. Istnieją różne usługi AWS, które obsługują obciążenia Ray w skalowalny i bezpieczny sposób, aby zapewnić doskonałą wydajność i efektywność operacyjną. Teraz Twoja kolej, aby poznać te potężne możliwości i rozpocząć optymalizację procesów uczenia maszynowego za pomocą Amazon SageMaker Pipelines i Ray. Podejmij działania już dziś i odblokuj pełny potencjał swoich projektów ML!


O autorze

Orchestrate Ray-based machine learning workflows using Amazon SageMaker | Amazon Web Services PlatoBlockchain Data Intelligence. Vertical Search. Ai.Raju Rangana jest starszym architektem rozwiązań w Amazon Web Services (AWS). Współpracuje z podmiotami sponsorowanymi przez rząd, pomagając im budować rozwiązania AI/ML przy użyciu AWS. Jeśli nie majsterkujesz przy rozwiązaniach chmurowych, przyłapiesz go spędzającego czas z rodziną lub rozbijającego ptaszki podczas ożywionej gry w badmintona z przyjaciółmi.

Orchestrate Ray-based machine learning workflows using Amazon SageMaker | Amazon Web Services PlatoBlockchain Data Intelligence. Vertical Search. Ai.Sherry Ding jest starszym architektem rozwiązań specjalistycznych AI/ML w Amazon Web Services (AWS). Posiada duże doświadczenie w uczeniu maszynowym, czego dowodem jest stopień doktora informatyki. Pracuje głównie z klientami z sektora publicznego nad różnymi wyzwaniami biznesowymi związanymi ze sztuczną inteligencją/ML, pomagając im przyspieszyć proces uczenia maszynowego w chmurze AWS. Kiedy nie pomaga klientom, lubi zajęcia na świeżym powietrzu.

Znak czasu:

Więcej z Uczenie maszynowe AWS