Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK

Трубопроводи Amazon SageMaker дає змогу спеціалістам із обробки даних та інженерам з машинного навчання (ML) автоматизувати робочі процеси навчання, що допомагає створити повторюваний процес для оркестрування етапів розробки моделі для швидкого експериментування та перенавчання моделі. Ви можете автоматизувати весь робочий процес побудови моделі, включаючи підготовку даних, розробку функцій, навчання моделі, налаштування моделі та перевірку моделі, а також каталогізувати це в реєстрі моделей. Ви можете налаштувати конвеєри на автоматичний запуск через регулярні проміжки часу або коли спрацьовують певні події, або ви можете запустити їх вручну за потреби.

У цій публікації ми висвітлюємо деякі вдосконалення в Amazon SageMaker SDK і представити нові функції Amazon SageMaker Pipelines, які спрощують для практиків ML створення та навчання моделей ML.

Pipelines продовжує вдосконалювати свій досвід розробників, і з цими останніми випусками тепер ви можете використовувати службу в більш персоналізований спосіб:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Оновлена ​​документація по PipelineVariable використання для оцінювачів, процесорів, тюнерів, трансформаторів і базових класів моделей, моделей Amazon і каркасних моделей. З новими версіями SDK будуть внесені додаткові зміни для підтримки всіх підкласів оцінювачів і процесорів.
  • 2.90.0 – Наявність ModelStep для створення і реєстрації ресурсу інтегрованої моделі.
  • 2.88.2 – Наявність PipelineSession для керованої взаємодії з об’єктами та ресурсами SageMaker.
  • 2.88.2 – Сумісність підкласів для етапи роботи конвеєра робочого процесу тож ви можете створювати абстракції завдань, налаштовувати та запускати завдання обробки, навчання, трансформації та налаштування, як без конвеєра.
  • 2.76.0 – Наявність FailStep для умовної зупинки трубопроводу зі статусом відмови.

У цій публікації ми проведемо вас через робочий процес, використовуючи зразок набору даних з акцентом на створення та розгортання моделі, щоб продемонструвати, як реалізувати нові функції Pipelines. Зрештою, ви маєте мати достатньо інформації, щоб успішно використовувати ці новіші функції та спростити робочі навантаження ML.

Огляд функцій

Pipelines пропонує такі нові функції:

  • Анотація змінної конвеєра – Певні параметри методу приймають кілька типів введення, включно з PipelineVariables, а також додано додаткову документацію, щоб уточнити, де PipelineVariables підтримуються як в останній стабільній версії документації SageMaker SDK, так і в сигнатурі ініціалізації функцій. Наприклад, у наведеному нижче інструменті оцінки TensorFlow сигнатура ініціалізації тепер показує це model_dir та image_uri підтримка PipelineVariables, тоді як інші параметри ні. Для отримання додаткової інформації див Оцінювач TensorFlow.
    • До:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Після:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Конвеєрний сеанс - PipelineSession це нова концепція, представлена ​​для забезпечення єдності в SageMaker SDK і впроваджує відкладену ініціалізацію ресурсів конвеєра (виклики запуску захоплюються, але не запускаються, доки конвеєр не буде створено та запущено). The PipelineSession контекст успадковує SageMakerSession і реалізує зручні методи для взаємодії з іншими об’єктами та ресурсами SageMaker, такими як навчальні завдання, кінцеві точки та набори вхідних даних, що зберігаються в Служба простого зберігання Amazon (Amazon S3).
  • Сумісність підкласу з етапами роботи конвеєра робочого процесу – Тепер ви можете створювати абстракції завдань, а також налаштовувати та запускати завдання обробки, навчання, трансформації та налаштування, як без конвеєра.
    • Наприклад, створення кроку обробки з SKLearnProcessor раніше вимагалося наступне:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Як ми бачимо в попередньому коді, ProcessingStep потрібно виконати в основному ту саму логіку попередньої обробки, що й .run, лише не ініціюючи виклик API для запуску завдання. Але завдяки сумісності підкласів із кроками роботи конвеєра робочого процесу, ми оголошуємо step_args аргумент, який використовує логіку попередньої обробки з .run, щоб ви могли створити абстракцію завдання та налаштувати її так, як ви використовували б без конвеєрів. Ми також проходимо в pipeline_session, Який представляє собою PipelineSession об'єкт, замість sagemaker_session щоб переконатися, що виклики запуску захоплюються, але не викликаються, доки не буде створено та запущено конвеєр. Перегляньте наступний код:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Крок моделі (спрощений підхід із етапами створення та реєстрації моделі) – Pipelines пропонує два типи кроків для інтеграції з моделями SageMaker: CreateModelStep та RegisterModel. Тепер ви можете досягти обох, використовуючи лише ModelStep типу. Зверніть увагу, що a PipelineSession щоб досягти цього. Це забезпечує подібність між етапами конвеєра та SDK.
    • До:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Після:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Крок відмови (умовна зупинка ділянки трубопроводу) - FailStep дозволяє зупинити конвеєр зі статусом відмови, якщо виконується умова, наприклад, якщо оцінка моделі нижче певного порогу.

Огляд рішення

У цьому рішенні вашою точкою входу є Студія Amazon SageMaker інтегроване середовище розробки (IDE) для швидкого експериментування. Studio пропонує середовище для керування наскрізними конвеєрами. За допомогою Studio ви можете обійти Консоль управління AWS для всього керування робочим процесом. Щоб отримати додаткові відомості про керування конвеєрами з Studio, див Переглядайте, відстежуйте та виконуйте конвеєри SageMaker у SageMaker Studio.

На наступній діаграмі показано високорівневу архітектуру робочого процесу ML із різними кроками для навчання та створення висновків за допомогою нових функцій.

Конвеєр включає в себе наступні кроки:

  1. Попередня обробка даних для створення необхідних функцій і розділення даних на набори даних навчання, перевірки та тестування.
  2. Створіть навчальну роботу за допомогою фреймворку SageMaker XGBoost.
  3. Оцініть навчену модель за допомогою тестового набору даних.
  4. Перевірте, чи показник AUC перевищує заздалегідь визначене порогове значення.
    • Якщо показник AUC нижчий за порогове значення, зупиніть запуск конвеєра та позначте його як невдалий.
    • Якщо показник AUC перевищує порогове значення, створіть модель SageMaker і зареєструйте її в реєстрі моделей SageMaker.
  5. Застосуйте пакетне перетворення до заданого набору даних за допомогою моделі, створеної на попередньому кроці.

Передумови

Щоб слідувати цій публікації, вам потрібен обліковий запис AWS з a Студійний домен.

Pipelines інтегровано безпосередньо з об’єктами та ресурсами SageMaker, тому вам не потрібно взаємодіяти з іншими службами AWS. Вам також не потрібно керувати будь-якими ресурсами, оскільки це повністю керована служба, що означає, що вона створює ресурси та керує ними за вас. Щоб отримати додаткові відомості про різні компоненти SageMaker, які є автономними API Python разом із інтегрованими компонентами Studio, див. Сторінка продукту SageMaker.

Перш ніж почати, інсталюйте SageMaker SDK версії >= 2.104.0 і xlrd >=1.0.0 у блокноті Studio, використовуючи такий фрагмент коду:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Робочий процес ML

Для цієї публікації ви використовуєте такі компоненти:

  • Підготовка даних
    • Обробка SageMaker – SageMaker Processing – це повністю керована служба, яка дозволяє запускати спеціальні перетворення даних і розробляти функції для робочих навантажень ML.
  • Будівництво моделі
  • Модель навчання та оцінювання
    • Навчання в один клік – Функція розподіленого навчання SageMaker. SageMaker надає розподілені навчальні бібліотеки для паралелізму даних і моделей. Бібліотеки оптимізовані для навчального середовища SageMaker, допомагають адаптувати ваші розподілені навчальні завдання до SageMaker і підвищують швидкість і продуктивність навчання.
    • Експерименти SageMaker – Експерименти – це можливість SageMaker, яка дозволяє організовувати, відстежувати, порівнювати та оцінювати ваші ітерації ML.
    • Пакетне перетворення SageMaker – Пакетне перетворення або офлайн-оцінка – це керована служба в SageMaker, яка дозволяє прогнозувати на більшому наборі даних за допомогою ваших моделей ML.
  • Оркестровка робочого процесу

Конвеєр SageMaker — це серія взаємопов’язаних кроків, визначених визначенням конвеєра JSON. Він кодує конвеєр за допомогою спрямованого ациклічного графа (DAG). DAG надає інформацію про вимоги та зв’язки між кожним кроком конвеєра, а його структура визначається залежностями даних між кроками. Ці залежності створюються, коли властивості результату кроку передаються як вхідні дані для іншого кроку.

На наступній діаграмі показано різні кроки в конвеєрі SageMaker (для випадку використання передбачення відтоку), де SageMaker визначає зв’язки між кроками на основі вхідних і вихідних даних, визначених визначеннями кроків.

У наступних розділах описано створення кожного кроку конвеєра та запуск усього конвеєра після створення.

Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Структура проекту

Почнемо зі структури проекту:

  • /sm-pipelines-end-to-end-example – Назва проекту
    • / data – Набори даних
    • /трубопроводи – Файли коду для компонентів трубопроводу
      • /customerchurn
        • preprocess.py
        • оцінити.py
    • sagemaker-pipelines-project.ipynb – Блокнот, який описує робочий процес моделювання за допомогою нових функцій Pipelines

Завантажте набір даних

Щоб слідувати цій публікації, вам потрібно завантажити та зберегти зразок набору даних у папці даних у домашньому каталозі проекту, де зберігається файл Еластична файлова система Amazon (Amazon EFS) у середовищі Studio.

Побудуйте компоненти трубопроводу

Тепер ви готові до створення компонентів трубопроводу.

Імпорт операторів і оголошення параметрів і констант

Створіть блокнот Studio під назвою sagemaker-pipelines-project.ipynb у домашньому каталозі проекту. Введіть наступний блок коду в комірку та запустіть комірку, щоб налаштувати клієнтські об’єкти SageMaker і S3, створити PipelineSessionі налаштуйте розташування сегмента S3 за допомогою стандартного сегмента, який постачається разом із сеансом SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Конвеєри підтримують параметризацію, яка дозволяє вказувати вхідні параметри під час виконання, не змінюючи код конвеєра. Ви можете використовувати модулі, доступні в розділі sagemaker.workflow.parameters модуль, наприклад ParameterInteger, ParameterFloat та ParameterString, щоб указати параметри конвеєра різних типів даних. Запустіть наступний код, щоб налаштувати кілька вхідних параметрів:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Створіть пакетний набір даних

Згенеруйте пакетний набір даних, який ви використаєте пізніше на етапі пакетного перетворення:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Завантажте дані в сегмент S3

Завантажте набори даних в Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Визначте сценарій обробки та крок обробки

На цьому етапі ви готуєте сценарій Python для розробки функцій, одного гарячого кодування та кураторів для навчання, перевірки та тестування, які використовуватимуться для створення моделі. Запустіть наступний код, щоб створити сценарій обробки:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Далі запустіть наступний блок коду, щоб створити екземпляр процесора, і крок Pipelines, щоб запустити сценарій обробки. Оскільки сценарій обробки написаний у Pandas, ви використовуєте a SKLearnProcessor. Трубопроводи ProcessingStep функція приймає такі аргументи: процесор, розташування вхідних S3 для необроблених наборів даних і вихідних розташування S3 для збереження оброблених наборів даних.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Визначте крок навчання

Налаштуйте навчання моделі за допомогою оцінювача SageMaker XGBoost і конвеєрів TrainingStep функція:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Визначте сценарій оцінки та крок оцінки моделі

Запустіть наступний блок коду, щоб оцінити модель після навчання. Цей сценарій інкапсулює логіку перевірки відповідності показника AUC вказаному пороговому значенню.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Далі запустіть наступний блок коду, щоб створити екземпляр процесора, і крок Pipelines, щоб запустити сценарій оцінки. Оскільки оціночний сценарій використовує пакет XGBoost, ви використовуєте a ScriptProcessor разом із зображенням XGBoost. Трубопроводи ProcessingStep функція приймає такі аргументи: процесор, розташування вхідних S3 для необроблених наборів даних і вихідних розташування S3 для збереження оброблених наборів даних.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Визначте крок створення моделі

Запустіть наступний блок коду, щоб створити модель SageMaker за допомогою кроку моделі Pipelines. На цьому етапі використовуються результати етапу навчання для упаковки моделі для розгортання. Зверніть увагу, що значення для аргументу типу екземпляра передається за допомогою параметра Pipelines, який ви визначили раніше в публікації.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Визначте крок пакетного перетворення

Запустіть наступний блок коду, щоб виконати пакетне перетворення за допомогою навченої моделі з пакетним введенням, створеним на першому кроці:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Визначте крок моделі реєстру

Наступний код реєструє модель у реєстрі моделей SageMaker за допомогою кроку моделі Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Визначте крок невдачі, щоб зупинити конвеєр

Наступний код визначає крок Pipelines fail для зупинки роботи конвеєра з повідомленням про помилку, якщо показник AUC не відповідає визначеному порогу:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Визначте крок умови для перевірки оцінки AUC

Наступний код визначає крок умови для перевірки оцінки AUC і умовного створення моделі та запуску пакетного перетворення та реєстрації моделі в реєстрі моделей або зупинки виконання конвеєра в невдалому стані:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Побудуйте та запустіть трубопровід

Після визначення всіх кроків компонентів ви можете зібрати їх в об’єкт Pipelines. Вам не потрібно вказувати порядок конвеєра, оскільки Pipelines автоматично визначає послідовність порядку на основі залежностей між кроками.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Запустіть наведений нижче код у клітинку блокнота. Якщо конвеєр вже існує, код оновлює конвеєр. Якщо конвеєр не існує, він створює новий.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Висновок

У цій публікації ми представили деякі нові функції, які тепер доступні в Pipelines, а також інші вбудовані функції SageMaker і алгоритм XGBoost для розробки, ітерації та розгортання моделі для прогнозування відтоку. Рішення можна розширити додатковими джерелами даних

для впровадження власного робочого процесу ML. Додаткову інформацію про кроки, доступні в робочому процесі Pipelines, див Конвеєр побудови моделі Amazon SageMaker та Робочі процеси SageMaker, Приклади AWS SageMaker У сховищі GitHub є більше прикладів різноманітних випадків використання конвеєрів.


Про авторів

Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Джеррі Пенг є інженером з розробки програмного забезпечення в AWS SageMaker. Він зосереджується на створенні наскрізної великомасштабної системи MLOps від навчання до моніторингу моделі у виробництві. Він також прагне донести концепцію MLO до ширшої аудиторії.

Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Девен Ци є інженером з розробки програмного забезпечення в AWS. Зараз вона зосереджена на розробці та вдосконаленні конвеєрів SageMaker. Поза роботою вона любить грати на віолончелі.

Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Гаятрі Ганакота є старшим інженером машинного навчання в AWS Professional Services. Вона захоплена розробкою, розгортанням і поясненням рішень AI/ML у різних областях. До цієї посади вона керувала кількома ініціативами в якості спеціаліста з обробки даних та інженера з ML у провідних світових компаніях у сфері фінансів і роздрібної торгівлі. Вона має ступінь магістра комп’ютерних наук, спеціалізуючись на Data Science, в Університеті Колорадо, Боулдер.

Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Рупіндер Гревал є старшим архітектором рішень Ai/ML в AWS. Зараз він зосереджується на обслуговуванні моделей і MLO на SageMaker. До цієї посади він працював інженером машинного навчання, створюючи та розміщуючи моделі. Поза роботою він любить грати в теніс і їздити на велосипеді гірськими стежками.

Нові функції для Amazon SageMaker Pipelines і Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Рей Лі є старшим спеціалістом із обробки даних у AWS Professional Services. Його спеціалізація зосереджена на розробці та впровадженні рішень AI/ML для клієнтів різного розміру, починаючи від стартапів і закінчуючи корпоративними організаціями. Крім роботи, Рей захоплюється фітнесом і подорожами.

Часова мітка:

Більше від AWS Машинне навчання