Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.

Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK

Rurociągi Amazon SageMaker pozwala naukowcom danych i inżynierom uczenia maszynowego (ML) zautomatyzować przepływy pracy szkoleniowej, co pomaga w tworzeniu powtarzalnego procesu orkiestracji etapów opracowywania modelu w celu szybkiego eksperymentowania i ponownego uczenia modelu. Można zautomatyzować cały przepływ pracy budowania modelu, w tym przygotowanie danych, inżynierię funkcji, uczenie modelu, dostrajanie modelu i walidację modelu, a także skatalogować go w rejestrze modeli. Potoki można skonfigurować tak, aby były uruchamiane automatycznie w regularnych odstępach czasu lub po wyzwoleniu określonych zdarzeń, albo można je uruchamiać ręcznie w razie potrzeby.

W tym poście przedstawiamy niektóre z ulepszeń w Amazon Sage Maker SDK i wprowadzają nowe funkcje Amazon SageMaker Pipelines, które ułatwiają praktykom ML tworzenie i trenowanie modeli ML.

Pipelines kontynuuje wprowadzanie innowacji, a dzięki najnowszym wydaniom możesz teraz korzystać z usługi w bardziej spersonalizowany sposób:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Zaktualizowana dokumentacja na PipelineVariable wykorzystanie dla klas bazowych estymatora, procesora, tunera, transformatora i modelu, modeli Amazon i modeli frameworka. W nowszych wersjach SDK pojawią się dodatkowe zmiany, które będą obsługiwać wszystkie podklasy estymatorów i procesorów.
  • 2.90.0 - Dostępność Model Krok do zadań związanych z tworzeniem i rejestracją zasobów modelu zintegrowanego.
  • 2.88.2 - Dostępność Sesja Pipeline do zarządzanej interakcji z podmiotami i zasobami SageMaker.
  • 2.88.2 – Kompatybilność podklas dla kroki zadania potoku przepływu pracy dzięki czemu można tworzyć abstrakcje zadań oraz konfigurować i uruchamiać przetwarzanie, trenowanie, przekształcanie i dostrajanie zadań tak, jak bez potoku.
  • 2.76.0 - Dostępność FazaKrok do warunkowego zatrzymania potoku ze stanem awarii.

W tym poście przeprowadzimy Cię przez przepływ pracy przy użyciu przykładowego zestawu danych z naciskiem na budowanie i wdrażanie modelu, aby zademonstrować, jak zaimplementować nowe funkcje Pipelines. Na koniec powinieneś mieć wystarczającą ilość informacji, aby pomyślnie korzystać z tych nowszych funkcji i uprościć obciążenia ML.

Przegląd funkcji

Pipelines oferuje następujące nowe funkcje:

  • Adnotacja zmiennej potoku – Niektóre parametry metody akceptują wiele typów danych wejściowych, w tym PipelineVariablesi dodano dodatkową dokumentację, aby wyjaśnić, gdzie PipelineVariables są obsługiwane zarówno w najnowszej stabilnej wersji dokumentacji SDK SageMaker, jak i sygnaturze init funkcji. Na przykład w poniższym estymatorze TensorFlow sygnatura init pokazuje teraz, że model_dir i image_uri wsparcie PipelineVariables, podczas gdy inne parametry nie. Aby uzyskać więcej informacji, zobacz Estymator przepływu Tensor.
    • Przed:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Po:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Sesja potokowa - Sesja Pipeline to nowa koncepcja wprowadzona w celu zapewnienia jedności w pakiecie SageMaker SDK i wprowadzająca leniwą inicjalizację zasobów potoku (wywołania uruchamiania są przechwytywane, ale nie są uruchamiane, dopóki potok nie zostanie utworzony i uruchomiony). The PipelineSession kontekst dziedziczy SageMakerSession i wdraża wygodne metody interakcji z innymi jednostkami i zasobami SageMaker, takimi jak zadania szkoleniowe, punkty końcowe i zestawy danych wejściowych przechowywane w Usługa Amazon Simple Storage (Amazonka S3).
  • Zgodność podklas z etapami zadań potoku przepływu pracy – Można teraz tworzyć abstrakcje zadań oraz konfigurować i uruchamiać przetwarzanie, trenowanie, przekształcanie i dostrajanie zadań tak samo, jak bez potoku.
    • Na przykład tworzenie etapu przetwarzania za pomocą SKLearnProcessor wcześniej wymagane:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Jak widzimy w poprzednim kodzie, ProcessingStep musi wykonać w zasadzie tę samą logikę przetwarzania wstępnego, co .run, po prostu bez inicjowania wywołania interfejsu API w celu uruchomienia zadania. Ale po włączeniu kompatybilności podklas z krokami zadań potoku przepływu pracy, deklarujemy step_args argument, który przyjmuje logikę przetwarzania wstępnego z .run, dzięki czemu można zbudować abstrakcję zadania i skonfigurować ją tak, jakbyś jej używał bez Pipelines. Mijamy również pipeline_session, który jest PipelineSession obiekt, zamiast sagemaker_session aby upewnić się, że wywołania uruchamiania są przechwytywane, ale nie są wywoływane, dopóki potok nie zostanie utworzony i uruchomiony. Zobacz następujący kod:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Etap modelu (uproszczone podejście z etapami tworzenia i rejestracji modelu) – Pipelines oferuje dwa typy kroków do integracji z modelami SageMaker: CreateModelStep i RegisterModel. Możesz teraz osiągnąć oba, używając tylko ModelStep rodzaj. Zwróć uwagę, że PipelineSession aby to osiągnąć. Zapewnia to podobieństwo między krokami potoku i zestawem SDK.
    • Przed:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Po:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fail step (warunkowe zatrzymanie przebiegu rurociągu) - FailStep umożliwia zatrzymanie potoku ze stanem niepowodzenia, jeśli warunek zostanie spełniony, na przykład jeśli wynik modelu jest poniżej określonego progu.

Omówienie rozwiązania

W tym rozwiązaniu punktem wyjścia jest Studio Amazon SageMaker zintegrowane środowisko programistyczne (IDE) do szybkiego eksperymentowania. Studio oferuje środowisko do zarządzania kompleksowym środowiskiem Pipelines. Dzięki Studio możesz ominąć Konsola zarządzania AWS dla całego zarządzania przepływem pracy. Aby uzyskać więcej informacji na temat zarządzania potokami z poziomu Studio, zobacz Wyświetlanie, śledzenie i wykonywanie potoków SageMaker w SageMaker Studio.

Poniższy diagram ilustruje architekturę wysokiego poziomu przepływu pracy ML z różnymi etapami uczenia i generowania wniosków przy użyciu nowych funkcji.

Potok obejmuje następujące kroki:

  1. Wstępne przetwarzanie danych w celu zbudowania wymaganych funkcji i podziału danych na zestawy danych dotyczących pociągów, walidacji i testów.
  2. Utwórz zadanie szkoleniowe za pomocą frameworka SageMaker XGBoost.
  3. Oceń wytrenowany model przy użyciu testowego zestawu danych.
  4. Sprawdź, czy wynik AUC przekracza wstępnie zdefiniowany próg.
    • Jeśli wynik AUC jest niższy niż próg, zatrzymaj działanie potoku i oznacz go jako nieudany.
    • Jeśli wynik AUC jest większy niż próg, utwórz model SageMaker i zarejestruj go w rejestrze modeli SageMaker.
  5. Zastosuj transformację wsadową do danego zestawu danych, korzystając z modelu utworzonego w poprzednim kroku.

Wymagania wstępne

Aby śledzić ten post, potrzebujesz konta AWS z Domena studia.

Pipelines jest zintegrowane bezpośrednio z podmiotami i zasobami SageMaker, więc nie musisz wchodzić w interakcje z innymi usługami AWS. Nie musisz również zarządzać żadnymi zasobami, ponieważ jest to usługa w pełni zarządzana, co oznacza, że ​​tworzy zasoby i zarządza nimi za Ciebie. Aby uzyskać więcej informacji na temat różnych komponentów SageMaker, które są zarówno samodzielnymi interfejsami API Pythona, jak i zintegrowanymi komponentami Studio, zobacz Strona produktu SageMaker.

Przed rozpoczęciem zainstaluj pakiet SageMaker SDK w wersji >= 2.104.0 i xlrd >=1.0.0 w notatniku Studio, korzystając z następującego fragmentu kodu:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Przepływ pracy w ML

W tym poście używasz następujących komponentów:

  • Przygotowywanie danych
    • Przetwarzanie SageMaker – SageMaker Processing to w pełni zarządzana usługa umożliwiająca przeprowadzanie niestandardowych transformacji danych i inżynierię funkcji dla obciążeń ML.
  • Budowa modelu
  • Szkolenie i ocena modeli
    • Szkolenie jednym kliknięciem – Funkcja szkolenia rozproszonego SageMaker. SageMaker zapewnia rozproszone biblioteki szkoleniowe dla równoległości danych i równoległości modeli. Biblioteki są zoptymalizowane pod kątem środowiska szkoleniowego SageMaker, pomagają dostosować rozproszone zadania szkoleniowe do SageMaker oraz poprawiają szybkość i wydajność szkolenia.
    • Eksperymenty SageMakera – Eksperymenty to funkcja programu SageMaker, która pozwala organizować, śledzić, porównywać i oceniać iteracje ML.
    • Transformacja wsadowa SageMaker – Przekształcanie wsadowe lub ocenianie offline to usługa zarządzana w programie SageMaker, która umożliwia prognozowanie na większym zestawie danych przy użyciu modeli ML.
  • Orkiestracja przepływu pracy

Potok SageMaker to seria połączonych ze sobą kroków zdefiniowanych przez definicję potoku JSON. Koduje potok za pomocą ukierunkowanego grafu acyklicznego (DAG). DAG podaje informacje o wymaganiach i relacjach między każdym krokiem potoku, a jego struktura jest określana przez zależności danych między krokami. Te zależności są tworzone, gdy właściwości danych wyjściowych kroku są przekazywane jako dane wejściowe do innego kroku.

Poniższy diagram ilustruje różne kroki w potoku SageMaker (dla przypadku użycia przewidywania rezygnacji), w których połączenia między krokami są wywnioskowane przez SageMaker na podstawie danych wejściowych i wyjściowych zdefiniowanych przez definicje kroków.

Kolejne sekcje przechodzą przez tworzenie każdego kroku potoku i uruchamianie całego potoku po jego utworzeniu.

Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.

Struktura projektu

Zacznijmy od struktury projektu:

  • /sm-pipelines-end-to-end-przykład – Nazwa projektu
    • /dane – Zbiory danych
    • /rurociągi – Pliki kodu dla komponentów potoku
      • /odejście klienta
        • preprocess.py
        • ocena.py
    • sagemaker-pipelines-project.ipynb – Notatnik przechodzący przez proces modelowania przy użyciu nowych funkcji Pipelines

Pobierz zbiór danych

Aby śledzić ten post, musisz pobrać i zapisać przykładowy zbiór danych pod folderem danych w katalogu domowym projektu, który zapisuje plik w System plików Amazon Elastic (Amazon EFS) w środowisku Studio.

Zbuduj komponenty rurociągu

Teraz jesteś gotowy do zbudowania komponentów potoku.

Importuj instrukcje i deklaruj parametry i stałe

Utwórz notatnik Studio o nazwie sagemaker-pipelines-project.ipynb w katalogu domowym projektu. Wprowadź następujący blok kodu w komórce i uruchom komórkę, aby skonfigurować obiekty klienta SageMaker i S3, utwórz PipelineSessioni skonfiguruj lokalizację zasobnika S3, używając domyślnego zasobnika, który jest dostarczany z sesją programu SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines obsługuje parametryzację, która umożliwia określenie parametrów wejściowych w czasie wykonywania bez zmiany kodu potoku. Możesz skorzystać z modułów dostępnych w ramach sagemaker.workflow.parameters moduł, taki jak ParameterInteger, ParameterFloat, ParameterString, aby określić parametry potoku różnych typów danych. Uruchom następujący kod, aby skonfigurować wiele parametrów wejściowych:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Wygeneruj wsadowy zbiór danych

Wygeneruj wsadowy zestaw danych, którego użyjesz później w kroku transformacji wsadowej:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Prześlij dane do zasobnika S3

Prześlij zbiory danych do Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Zdefiniuj skrypt przetwarzania i krok przetwarzania

W tym kroku przygotowujesz skrypt w języku Python do wykonywania inżynierii funkcji, jednego kodowania na gorąco i nadzorujesz podziały szkoleniowe, walidacyjne i testowe, które będą używane do budowania modelu. Uruchom następujący kod, aby skompilować skrypt przetwarzania:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Następnie uruchom następujący blok kodu, aby utworzyć wystąpienie procesora i krok Pipelines, aby uruchomić skrypt przetwarzania. Ponieważ skrypt przetwarzania jest napisany w Pandas, używasz a Procesor SKLearn. Rurociągi ProcessingStep Funkcja przyjmuje następujące argumenty: procesor, wejściowe lokalizacje S3 dla surowych zestawów danych i wyjściowe lokalizacje S3 do zapisywania przetworzonych zestawów danych.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Zdefiniuj etap szkolenia

Skonfiguruj szkolenie modelu za pomocą estymatora SageMaker XGBoost i Pipelines TrainingStep funkcjonować:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Zdefiniuj skrypt oceny i krok oceny modelu

Uruchom następujący blok kodu, aby ocenić model po przeszkoleniu. Ten skrypt hermetyzuje logikę, aby sprawdzić, czy wynik AUC spełnia określony próg.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Następnie uruchom następujący blok kodu, aby utworzyć wystąpienie procesora i krok Pipelines, aby uruchomić skrypt oceny. Ponieważ skrypt oceny używa pakietu XGBoost, używasz ScriptProcessor wraz z obrazem XGBoost. Rurociągi ProcessingStep Funkcja przyjmuje następujące argumenty: procesor, wejściowe lokalizacje S3 dla surowych zestawów danych i wyjściowe lokalizacje S3 do zapisywania przetworzonych zestawów danych.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Zdefiniuj krok tworzenia modelu

Uruchom następujący blok kodu, aby utworzyć model SageMaker przy użyciu kroku Model potoków. Ten krok wykorzystuje dane wyjściowe kroku szkolenia do spakowania modelu do wdrożenia. Zauważ, że wartość argumentu typu instancji jest przekazywana przy użyciu parametru Pipelines, który zdefiniowałeś wcześniej we wpisie.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Zdefiniuj krok transformacji wsadowej

Uruchom następujący blok kodu, aby uruchomić transformację wsadową przy użyciu wytrenowanego modelu z danymi wejściowymi wsadowymi utworzonymi w pierwszym kroku:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Zdefiniuj krok modelu rejestru

Poniższy kod rejestruje model w rejestrze modeli SageMaker przy użyciu kroku modelu Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Zdefiniuj krok niepowodzenia, aby zatrzymać potok

Poniższy kod definiuje krok niepowodzenie potoków, aby zatrzymać uruchomienie potoku z komunikatem o błędzie, jeśli wynik AUC nie spełnia zdefiniowanego progu:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Zdefiniuj krok warunku, aby sprawdzić wynik AUC

Poniższy kod definiuje krok warunku, aby sprawdzić wynik AUC i warunkowo utworzyć model i uruchomić transformację wsadową oraz zarejestrować model w rejestrze modeli lub zatrzymać uruchomienie potoku w stanie niepowodzenia:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Zbuduj i uruchom potok

Po zdefiniowaniu wszystkich kroków składowych można je złożyć w obiekt Pipelines. Nie musisz określać kolejności potoku, ponieważ Pipelines automatycznie określa kolejność kolejności na podstawie zależności między krokami.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Uruchom następujący kod w komórce w notesie. Jeśli potok już istnieje, kod aktualizuje potok. Jeśli potok nie istnieje, tworzy nowy.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Wnioski

W tym poście przedstawiliśmy niektóre z nowych funkcji dostępnych teraz w Pipelines wraz z innymi wbudowanymi funkcjami SageMaker i algorytmem XGBoost do tworzenia, iteracji i wdrażania modelu do przewidywania rezygnacji. Rozwiązanie można rozszerzyć o dodatkowe źródła danych

do wdrożenia własnego przepływu pracy ML. Aby uzyskać więcej informacji na temat kroków dostępnych w obiegu pracy Pipelines, zobacz Rurociąg modelowania Amazon SageMaker i Przepływy pracy SageMaker, Przykłady AWS SageMaker Repozytorium GitHub zawiera więcej przykładów dotyczących różnych przypadków użycia przy użyciu Pipelines.


O autorach

Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Jerry'ego Penga jest inżynierem oprogramowania w AWS SageMaker. Koncentruje się na budowaniu kompleksowego systemu MLOps na dużą skalę od szkolenia po monitorowanie modelu w produkcji. Pasjonuje go również przybliżanie koncepcji MLOps szerszej publiczności.

Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Dewena Qi jest inżynierem rozwoju oprogramowania w AWS. Obecnie koncentruje się na rozwijaniu i ulepszaniu SageMaker Pipelines. Poza pracą lubi ćwiczyć wiolonczelę.

Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Gajatri Ghanakota jest starszym inżynierem uczenia maszynowego w AWS Professional Services. Pasjonuje się tworzeniem, wdrażaniem i wyjaśnianiem rozwiązań AI/ML w różnych dziedzinach. Przed objęciem tej roli kierowała wieloma inicjatywami jako analityk danych i inżynier ML w czołowych globalnych firmach z branży finansowej i detalicznej. Posiada tytuł magistra informatyki ze specjalizacją Data Science uzyskany na Uniwersytecie Kolorado w Boulder.

Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Rozbijacz Grewal jest Sr Ai/ML Specialist Solutions Architect z AWS. Obecnie skupia się na serwowaniu modeli i MLOpów na SageMakerze. Wcześniej pracował jako inżynier ds. uczenia maszynowego budując i hostując modele. Poza pracą lubi grać w tenisa i jeździć na rowerze po górskich szlakach.

Nowe funkcje dla Amazon SageMaker Pipelines i Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.promień li jest starszym analitykiem danych z AWS Professional Services. Jego specjalność skupia się na budowaniu i operacjonalizacji rozwiązań AI/ML dla klientów różnej wielkości, od startupów po organizacje korporacyjne. Poza pracą Ray lubi fitness i podróże.

Znak czasu:

Więcej z Uczenie maszynowe AWS