Новые функции для Amazon SageMaker Pipelines и Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Новые функции Amazon SageMaker Pipelines и Amazon SageMaker SDK

Конвейеры Amazon SageMaker позволяет специалистам по данным и инженерам по машинному обучению (ML) автоматизировать рабочие процессы обучения, что помогает создать повторяемый процесс для организации шагов разработки модели для быстрого экспериментирования и переобучения модели. Вы можете автоматизировать весь рабочий процесс построения модели, включая подготовку данных, разработку функций, обучение модели, настройку модели и проверку модели, а также каталогизировать ее в реестре моделей. Вы можете настроить конвейеры для автоматического запуска через регулярные промежутки времени или при возникновении определенных событий, или вы можете запускать их вручную по мере необходимости.

В этой статье мы расскажем о некоторых улучшениях Создатель мудреца Амазонки SDK и представить новые функции Amazon SageMaker Pipelines, упрощающие специалистам по машинному обучению создание и обучение моделей машинного обучения.

Pipelines продолжает совершенствовать свой опыт разработчиков, и благодаря этим недавним выпускам вы теперь можете использовать сервис более индивидуально:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Обновлена ​​документация по PipelineVariable использование для оценщика, процессора, тюнера, преобразователя и базовых классов моделей, моделей Amazon и моделей фреймворка. В новых версиях SDK будут внесены дополнительные изменения для поддержки всех подклассов оценщиков и процессоров.
  • 2.90.0 – Наличие Шаг модели для задач создания и регистрации ресурсов интегрированной модели.
  • 2.88.2 – Наличие PipelineSession для управляемого взаимодействия с сущностями и ресурсами SageMaker.
  • 2.88.2 – Совместимость подклассов для этапы задания конвейера рабочего процесса поэтому вы можете создавать абстракции заданий, а также настраивать и запускать задания обработки, обучения, преобразования и настройки так же, как и без конвейера.
  • 2.76.0 – Наличие Шаг неудачи условно остановить конвейер со статусом отказа.

В этом посте мы познакомим вас с рабочим процессом с использованием образца набора данных с акцентом на построение и развертывание модели, чтобы продемонстрировать, как реализовать новые функции Pipelines. К концу у вас должно быть достаточно информации, чтобы успешно использовать эти новые функции и упростить рабочие нагрузки машинного обучения.

Обзор возможностей

Pipelines предлагает следующие новые функции:

  • Аннотация переменной конвейера – Некоторые параметры метода принимают несколько типов входных данных, включая PipelineVariablesи добавлена ​​дополнительная документация, поясняющая, где PipelineVariables поддерживаются как последней стабильной версией документации SageMaker SDK, так и сигнатурой инициализации функций. Например, в следующем оценщике TensorFlow сигнатура инициализации теперь показывает, что model_dir и image_uri поддержка PipelineVariables, тогда как другие параметры этого не делают. Для получения дополнительной информации см. Оценщик TensorFlow.
    • До:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • После:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Конвейерная сессияPipelineSession — это новая концепция, введенная для обеспечения единства SageMaker SDK и вводит ленивую инициализацию ресурсов конвейера (вызовы выполнения перехватываются, но не выполняются до тех пор, пока конвейер не будет создан и запущен). PipelineSession контекст наследует SageMakerSession и реализует удобные методы взаимодействия с другими объектами и ресурсами SageMaker, такими как задания обучения, конечные точки и наборы входных данных, хранящиеся в Простой сервис хранения Amazon (Amazon S3).
  • Совместимость подкласса с этапами задания конвейера рабочего процесса – Теперь вы можете создавать абстракции заданий, а также настраивать и запускать задания обработки, обучения, преобразования и настройки так же, как и без конвейера.
    • Например, создание шага обработки с помощью SKLearnProcessor ранее требовалось следующее:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Как мы видим в предыдущем коде, ProcessingStep должен выполнять в основном ту же логику предварительной обработки, что и .run, просто не инициируя вызов API для запуска задания. Но теперь, когда совместимость подклассов включена с помощью шагов задания конвейера рабочего процесса, мы объявляем step_args Аргумент, который принимает логику предварительной обработки с помощью .run, поэтому вы можете создать абстракцию задания и настроить ее так, как если бы вы использовали ее без конвейеров. Мы также проходим в pipeline_session, Которая является PipelineSession объект вместо sagemaker_session чтобы убедиться, что вызовы запуска перехватываются, но не вызываются до тех пор, пока конвейер не будет создан и запущен. См. следующий код:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Шаг модели (упрощенный подход с этапами создания и регистрации модели) –Pipelines предлагает два типа шагов для интеграции с моделями SageMaker: CreateModelStep и RegisterModel. Теперь вы можете добиться того и другого, используя только ModelStep тип. Обратите внимание, что PipelineSession требуется для достижения этого. Это обеспечивает сходство между этапами конвейера и SDK.
    • До:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • После:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Неисправный шаг (условная остановка трассы трубопровода)FailStep позволяет остановить конвейер со статусом отказа, если выполняется условие, например, если оценка модели ниже определенного порога.

Обзор решения

В этом решении вашей точкой входа является Студия Amazon SageMaker интегрированная среда разработки (IDE) для быстрого экспериментирования. Studio предлагает среду для комплексного управления Pipelines. С помощью Studio вы можете обойти Консоль управления AWS для управления всем вашим рабочим процессом. Дополнительную информацию об управлении конвейерами из Studio см. Просмотр, отслеживание и выполнение конвейеров SageMaker в SageMaker Studio.

На следующей диаграмме показана высокоуровневая архитектура рабочего процесса машинного обучения с различными этапами обучения и формирования выводов с использованием новых функций.

Конвейер включает в себя следующие этапы:

  1. Предварительно обработайте данные для создания необходимых функций и разделения данных на наборы данных для обучения, проверки и тестирования.
  2. Создайте задание обучения с помощью платформы SageMaker XGBoost.
  3. Оцените обученную модель, используя набор тестовых данных.
  4. Проверьте, превышает ли показатель AUC заранее заданный порог.
    • Если показатель AUC меньше порогового значения, остановите запуск конвейера и отметьте его как неудачный.
    • Если показатель AUC превышает пороговое значение, создайте модель SageMaker и зарегистрируйте ее в реестре моделей SageMaker.
  5. Примените пакетное преобразование к данному набору данных, используя модель, созданную на предыдущем шаге.

Предпосылки

Чтобы следовать этому сообщению, вам понадобится учетная запись AWS с Домен студии.

Pipelines напрямую интегрированы с объектами и ресурсами SageMaker, поэтому вам не нужно взаимодействовать с какими-либо другими сервисами AWS. Вам также не нужно управлять какими-либо ресурсами, поскольку это полностью управляемая служба, а это означает, что она создает ресурсы и управляет ими для вас. Для получения дополнительной информации о различных компонентах SageMaker, которые являются как автономными API-интерфейсами Python, так и интегрированными компонентами Studio, см. Страница продукта SageMaker.

Прежде чем приступить к работе, установите SageMaker SDK версии >= 2.104.0 и xlrd >=1.0.0 в записной книжке Studio, используя следующий фрагмент кода:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Рабочий процесс машинного обучения

Для этого поста вы используете следующие компоненты:

  • Подготовка данных
    • Обработка SageMaker – SageMaker Processing – это полностью управляемый сервис, позволяющий выполнять пользовательские преобразования данных и разработку функций для рабочих нагрузок машинного обучения.
  • Построение модели
  • Обучение и оценка модели
    • Обучение в один клик – Функция распределенного обучения SageMaker. SageMaker предоставляет распределенные обучающие библиотеки для параллелизма данных и параллелизма моделей. Библиотеки оптимизированы для среды обучения SageMaker, помогают адаптировать ваши распределенные учебные задания к SageMaker и повышают скорость и производительность обучения.
    • SageMaker эксперименты — Эксперименты — это возможность SageMaker, которая позволяет упорядочивать, отслеживать, сравнивать и оценивать ваши итерации машинного обучения.
    • Пакетное преобразование SageMaker – Пакетное преобразование или автономная оценка – это управляемая служба в SageMaker, которая позволяет прогнозировать больший набор данных с использованием моделей машинного обучения.
  • Оркестрация рабочего процесса

Конвейер SageMaker — это ряд взаимосвязанных шагов, определяемых определением конвейера JSON. Он кодирует конвейер с помощью направленного ациклического графа (DAG). DAG предоставляет информацию о требованиях и взаимосвязях между каждым этапом конвейера, а его структура определяется зависимостями данных между этапами. Эти зависимости создаются, когда свойства выходных данных шага передаются в качестве входных данных для другого шага.

На следующей диаграмме показаны различные шаги в конвейере SageMaker (для варианта использования прогнозирования оттока), где SageMaker выводит связи между шагами на основе входных и выходных данных, определенных определениями шагов.

В следующих разделах описывается создание каждого этапа конвейера и запуск всего созданного конвейера.

Новые функции для Amazon SageMaker Pipelines и Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Структура проекта

Начнем со структуры проекта:

  • /sm-pipelines-end-to-end-example – Название проекта
    • /данные – Наборы данных
    • /трубопроводы – Файлы кода для компонентов конвейера.
      • /отток клиентов
        • preprocess.py
        • оценить.py
    • sagemaker-pipelines-project.ipynb – Блокнот, демонстрирующий рабочий процесс моделирования с использованием новых функций Pipelines.

Скачать набор данных

Чтобы следовать этому сообщению, вам необходимо скачать и сохранить образец набора данных в папке данных в домашнем каталоге проекта, при этом файл сохраняется в Эластичная файловая система Amazon (Amazon EFS) в среде Studio.

Сборка компонентов конвейера

Теперь вы готовы создавать компоненты конвейера.

Импорт операторов и объявление параметров и констант

Создайте блокнот Studio с именем sagemaker-pipelines-project.ipynb в домашнем каталоге проекта. Введите следующий блок кода в ячейку и запустите ячейку, чтобы настроить SageMaker и клиентские объекты S3, создать PipelineSessionи настройте местоположение корзины S3, используя корзину по умолчанию, которая поставляется с сеансом SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Конвейеры поддерживают параметризацию, которая позволяет указывать входные параметры во время выполнения без изменения кода конвейера. Вы можете использовать модули, доступные в разделе sagemaker.workflow.parameters модуль, например ParameterInteger, ParameterFloatи ParameterString, чтобы указать параметры конвейера различных типов данных. Запустите следующий код, чтобы настроить несколько входных параметров:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Создать пакетный набор данных

Создайте пакетный набор данных, который вы будете использовать позже на этапе пакетного преобразования:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Загрузить данные в корзину S3

Загрузите наборы данных в Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Определите сценарий обработки и этап обработки.

На этом шаге вы подготавливаете скрипт Python для разработки функций, одного горячего кодирования и курируете обучение, проверку и тестовые разделения, которые будут использоваться для построения модели. Запустите следующий код, чтобы создать сценарий обработки:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Затем запустите следующий блок кода, чтобы создать экземпляр процессора, и шаг Pipelines, чтобы запустить сценарий обработки. Поскольку сценарий обработки написан на Pandas, вы используете SKLearnПроцессор. Трубопроводы ProcessingStep Функция принимает следующие аргументы: процессор, входные местоположения S3 для необработанных наборов данных и выходные местоположения S3 для сохранения обработанных наборов данных.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Определение шага обучения

Настройте обучение модели с помощью оценщика SageMaker XGBoost и конвейеров. TrainingStep функция:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Определите сценарий оценки и этап оценки модели.

Запустите следующий блок кода, чтобы оценить модель после обучения. Этот сценарий инкапсулирует логику проверки соответствия показателя AUC указанному порогу.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Затем запустите следующий блок кода, чтобы создать экземпляр процессора, и шаг Pipelines, чтобы запустить сценарий оценки. Поскольку оценочный сценарий использует пакет XGBoost, вы используете ScriptProcessor вместе с изображением XGBoost. Трубопроводы ProcessingStep Функция принимает следующие аргументы: процессор, входные местоположения S3 для необработанных наборов данных и выходные местоположения S3 для сохранения обработанных наборов данных.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Определите шаг создания модели

Запустите следующий блок кода, чтобы создать модель SageMaker с помощью шага модели Pipelines. На этом шаге выходные данные этапа обучения используются для упаковки модели для развертывания. Обратите внимание, что значение аргумента типа экземпляра передается с использованием параметра Pipelines, который вы определили ранее в посте.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Определите шаг пакетного преобразования

Запустите следующий блок кода, чтобы запустить пакетное преобразование с использованием обученной модели с пакетным вводом, созданным на первом этапе:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Определить шаг модели регистрации

Следующий код регистрирует модель в реестре моделей SageMaker с помощью шага модели Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Определите шаг сбоя для остановки конвейера

Следующий код определяет шаг сбоя конвейеров, чтобы остановить запуск конвейера с сообщением об ошибке, если показатель AUC не соответствует определенному порогу:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Определите шаг условия для проверки оценки AUC

Следующий код определяет шаг условия для проверки оценки AUC и условного создания модели, запуска пакетного преобразования и регистрации модели в реестре модели или остановки запуска конвейера в состоянии сбоя:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Создайте и запустите конвейер

После определения всех шагов компонента вы можете собрать их в объект Pipelines. Вам не нужно указывать порядок конвейера, поскольку Pipelines автоматически выводит последовательность порядка на основе зависимостей между шагами.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Запустите следующий код в ячейке записной книжки. Если конвейер уже существует, код обновляет конвейер. Если конвейер не существует, он создает новый.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Заключение

В этом посте мы представили некоторые из новых функций, которые теперь доступны в Pipelines, а также другие встроенные функции SageMaker и алгоритм XGBoost для разработки, итерации и развертывания модели для прогнозирования оттока. Решение может быть дополнено дополнительными источниками данных

для реализации собственного рабочего процесса машинного обучения. Дополнительные сведения о шагах, доступных в рабочем процессе Pipelines, см. Конвейер построения моделей Amazon SageMaker и Рабочие процессы SageMaker, Примеры AWS SageMaker В репозитории GitHub есть больше примеров различных вариантов использования Pipelines.


Об авторах

Новые функции для Amazon SageMaker Pipelines и Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Джерри ПэнЕще является инженером-разработчиком программного обеспечения в AWS SageMaker. Он занимается созданием комплексной крупномасштабной системы MLOps от обучения до мониторинга моделей в производстве. Он также увлечен тем, чтобы донести концепцию MLOps до более широкой аудитории.

Новые функции для Amazon SageMaker Pipelines и Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Девэн Ци — инженер-разработчик программного обеспечения в AWS. В настоящее время она занимается разработкой и улучшением конвейеров SageMaker. Вне работы она увлекается игрой на виолончели.

Новые функции для Amazon SageMaker Pipelines и Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Гаятри Ганакота — старший инженер по машинному обучению в AWS Professional Services. Она увлечена разработкой, развертыванием и объяснением решений AI/ML в различных областях. До этой должности она руководила несколькими инициативами в качестве специалиста по данным и инженера по машинному обучению в ведущих мировых компаниях в сфере финансов и розничной торговли. Она имеет степень магистра компьютерных наук, специализирующуюся на науке о данных, полученную в Колорадском университете в Боулдере.

Новые функции для Amazon SageMaker Pipelines и Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Рупиндер Гревал является старшим архитектором решений Ai/ML Specialist в AWS. В настоящее время он занимается обслуживанием моделей и MLOps в SageMaker. До этой должности он работал инженером по машинному обучению, создавая и размещая модели. Вне работы он любит играть в теннис и кататься на велосипеде по горным тропам.

Новые функции для Amazon SageMaker Pipelines и Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Рэй Ли является старшим специалистом по данным в AWS Professional Services. Его специальность сосредоточена на создании и внедрении решений AI/ML для клиентов разного размера, от стартапов до корпоративных организаций. Вне работы Рэй увлекается фитнесом и путешествует.

Отметка времени:

Больше от Машинное обучение AWS