Amazon SageMaker Pipelines 및 Amazon SageMaker SDK PlatoBlockchain Data Intelligence의 새로운 기능. 수직 검색. 일체 포함.

Amazon SageMaker 파이프라인 및 Amazon SageMaker SDK의 새로운 기능

Amazon SageMaker 파이프 라인 데이터 과학자와 머신 러닝(ML) 엔지니어가 교육 워크플로를 자동화할 수 있도록 하여 신속한 실험 및 모델 재교육을 위해 모델 개발 단계를 오케스트레이션하는 반복 가능한 프로세스를 생성하는 데 도움이 됩니다. 데이터 준비, 기능 엔지니어링, 모델 교육, 모델 조정 및 모델 유효성 검사를 포함한 전체 모델 빌드 워크플로를 자동화하고 모델 레지스트리에 분류할 수 있습니다. 정기적인 간격으로 또는 특정 이벤트가 트리거될 때 자동으로 실행되도록 파이프라인을 구성하거나 필요에 따라 수동으로 실행할 수 있습니다.

이 게시물에서는 몇 가지 개선 사항을 강조합니다. 아마존 세이지 메이커 SDK 및 Amazon SageMaker Pipelines의 새로운 기능을 소개하여 ML 실무자가 ML 모델을 더 쉽게 구축하고 교육할 수 있도록 합니다.

Pipelines는 계속해서 개발자 경험을 혁신하고 있으며 이러한 최신 릴리스를 통해 이제 보다 맞춤화된 방식으로 서비스를 사용할 수 있습니다.

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – 업데이트된 문서 PipelineVariable 추정기, 프로세서, 튜너, 변환기 및 모델 기본 클래스, Amazon 모델 및 프레임워크 모델에 대한 사용. 추정기 및 프로세서의 모든 하위 클래스를 지원하기 위해 최신 버전의 SDK와 함께 추가 변경 사항이 있을 것입니다.
  • 2.90.0 – 가용성 모델스텝 통합 모델 리소스 생성 및 등록 작업용.
  • 2.88.2 – 가용성 파이프라인세션 SageMaker 엔터티 및 리소스와의 관리 상호 작용을 위해.
  • 2.88.2 – 서브클래스 호환성 워크플로 파이프라인 작업 단계 따라서 파이프라인 없이 작업을 추상화하고 처리, 교육, 변환 및 조정 작업을 구성 및 실행할 수 있습니다.
  • 2.76.0 – 가용성 페일스텝 실패 상태의 파이프라인을 조건부로 중지합니다.

이 게시물에서는 파이프라인의 새로운 기능을 구현하는 방법을 보여주기 위해 모델 구축 및 배포에 중점을 둔 샘플 데이터 세트를 사용하는 워크플로를 안내합니다. 결국 이러한 새로운 기능을 성공적으로 사용하고 ML 워크로드를 단순화하기에 충분한 정보를 갖게 됩니다.

기능 개요

파이프라인은 다음과 같은 새로운 기능을 제공합니다.

  • 파이프라인 변수 주석 – 특정 메소드 매개변수는 다음을 포함한 여러 입력 유형을 허용합니다. PipelineVariables, 그리고 추가 문서가 어디에 있는지 명확히 하기 위해 추가되었습니다. PipelineVariables 최신 안정 버전의 SageMaker SDK 설명서와 함수의 초기화 서명에서 모두 지원됩니다. 예를 들어 다음 TensorFlow 추정기에서 init 서명은 이제 다음을 보여줍니다. model_dirimage_uri SUPPORT PipelineVariables, 다른 매개변수는 그렇지 않습니다. 자세한 내용은 텐서플로우 추정기.
    • 전에:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • 후 :
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • 파이프라인 세션 - 파이프라인세션 SageMaker SDK 전체에 통합을 가져오기 위해 도입된 새로운 개념이며 파이프라인 리소스의 지연 초기화를 도입합니다(실행 호출은 캡처되지만 파이프라인이 생성 및 실행될 때까지 실행되지 않음). 그만큼 PipelineSession 컨텍스트 상속 SageMakerSession 교육 작업, 엔드포인트 및 에 저장된 입력 데이터 세트와 같은 다른 SageMaker 엔터티 및 리소스와 상호 작용할 수 있는 편리한 방법을 구현합니다. 아마존 단순 스토리지 서비스 (아마존 S3).
  • 워크플로 파이프라인 작업 단계와의 하위 클래스 호환성 – 이제 파이프라인 없이 작업을 추상화하고 처리, 교육, 변환 및 조정 작업을 구성 및 실행할 수 있습니다.
    • 예를 들어 다음을 사용하여 처리 단계를 생성합니다. SKLearnProcessor 이전에는 다음이 필요했습니다.
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • 앞의 코드에서 볼 수 있듯이, ProcessingStep 기본적으로 다음과 같은 전처리 논리를 수행해야 합니다. .run, API 호출을 시작하지 않고 작업을 시작합니다. 그러나 이제 워크플로 파이프라인 작업 단계에서 하위 클래스 호환성이 활성화되어 다음을 선언합니다. step_args 작업 추상화를 빌드하고 파이프라인 없이 사용하는 것처럼 구성할 수 있도록 .run을 사용하여 사전 처리 논리를 사용하는 인수입니다. 우리는 또한 통과 pipeline_session, 어느입니다 PipelineSession 대신에 개체 sagemaker_session 실행 호출이 캡처되지만 파이프라인이 생성되고 실행될 때까지 호출되지 않도록 합니다. 다음 코드를 참조하십시오.
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • 모델 단계(모델 생성 및 등록 단계를 통한 간소화된 접근 방식) -Pipelines는 SageMaker 모델과 통합할 수 있는 두 가지 단계 유형을 제공합니다. CreateModelStepRegisterModel. 이제 만 사용하여 두 가지를 모두 달성할 수 있습니다. ModelStep 유형. 참고 PipelineSession 이를 달성하기 위해 필요합니다. 이는 파이프라인 단계와 SDK 간에 유사성을 제공합니다.
    • 전에:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • 후 :
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • 실패 단계(파이프라인 실행의 조건부 중지) - FailStep 모델 점수가 특정 임계값 미만인 경우와 같이 조건이 충족되면 파이프라인이 실패 상태로 중지될 수 있습니다.

솔루션 개요

이 솔루션에서 진입점은 아마존 세이지 메이커 스튜디오 신속한 실험을 위한 통합 개발 환경(IDE). Studio는 종단 간 파이프라인 경험을 관리할 수 있는 환경을 제공합니다. Studio를 사용하면 우회할 수 있습니다. AWS 관리 콘솔 전체 워크플로 관리를 위해 Studio 내에서 파이프라인 관리에 대한 자세한 내용은 다음을 참조하세요. SageMaker Studio에서 SageMaker 파이프라인 보기, 추적 및 실행.

다음 다이어그램은 새로운 기능을 사용하여 추론을 훈련하고 생성하는 다양한 단계가 있는 ML 워크플로의 상위 수준 아키텍처를 보여줍니다.

파이프라인에는 다음 단계가 포함됩니다.

  1. 데이터를 사전 처리하여 필요한 기능을 구축하고 데이터를 학습, 검증 및 테스트 데이터 세트로 분할합니다.
  2. SageMaker XGBoost 프레임워크를 사용하여 교육 작업을 생성합니다.
  3. 테스트 데이터 세트를 사용하여 훈련된 모델을 평가합니다.
  4. AUC 점수가 미리 정의된 임계값을 초과하는지 확인합니다.
    • AUC 점수가 임계값보다 작으면 파이프라인 실행을 중지하고 실패한 것으로 표시합니다.
    • AUC 점수가 임계값보다 크면 SageMaker 모델을 생성하고 SageMaker 모델 레지스트리에 등록합니다.
  5. 이전 단계에서 만든 모델을 사용하여 지정된 데이터 세트에 일괄 변환을 적용합니다.

사전 조건

이 게시물을 따라 하려면 다음이 포함된 AWS 계정이 필요합니다. 스튜디오 도메인.

파이프라인은 SageMaker 엔터티 및 리소스와 직접 통합되므로 다른 AWS 서비스와 상호 작용할 필요가 없습니다. 또한 완전 관리형 서비스이므로 리소스를 관리할 필요가 없습니다. 즉, 리소스를 생성하고 관리합니다. Studio의 통합 구성 요소와 함께 독립 실행형 Python API인 다양한 SageMaker 구성 요소에 대한 자세한 내용은 다음을 참조하십시오. SageMaker 제품 페이지.

시작하기 전에 다음 코드 조각을 사용하여 Studio 노트북에 SageMaker SDK 버전 >= 2.104.0 및 xlrd >=1.0.0을 설치합니다.

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML 워크플로

이 게시물에서는 다음 구성 요소를 사용합니다.

  • 데이터 준비
    • SageMaker 처리 – SageMaker Processing은 ML 워크로드에 대한 사용자 지정 데이터 변환 및 기능 엔지니어링을 실행할 수 있는 완전 관리형 서비스입니다.
  • 모델 구축
  • 모델 교육 및 평가
    • 원클릭 교육 – SageMaker 분산 교육 기능. SageMaker는 데이터 병렬 및 모델 병렬을 위한 분산 교육 라이브러리를 제공합니다. 라이브러리는 SageMaker 교육 환경에 최적화되어 있으며 분산 교육 작업을 SageMaker에 적용하고 교육 속도와 처리량을 개선합니다.
    • SageMaker 실험 – 실험은 ML 반복을 구성, 추적, 비교 및 ​​평가할 수 있는 SageMaker의 기능입니다.
    • SageMaker 일괄 변환 – 일괄 변환 또는 오프라인 채점은 ML 모델을 사용하여 더 큰 데이터 세트를 예측할 수 있는 SageMaker의 관리형 서비스입니다.
  • 워크플로 오케스트레이션

SageMaker 파이프라인은 JSON 파이프라인 정의에 의해 정의된 일련의 상호 연결된 단계입니다. DAG(방향성 비순환 그래프)를 사용하여 파이프라인을 인코딩합니다. DAG는 파이프라인의 각 단계에 대한 요구 사항 및 관계에 대한 정보를 제공하며 그 구조는 단계 간의 데이터 종속성에 의해 결정됩니다. 이러한 종속성은 단계 출력의 속성이 다른 단계의 입력으로 전달될 때 생성됩니다.

다음 다이어그램은 단계 정의에 의해 정의된 입력 및 출력을 기반으로 SageMaker가 단계 간의 연결을 유추하는 SageMaker 파이프라인(이탈 예측 사용 사례의 경우)의 다양한 단계를 보여줍니다.

다음 섹션에서는 파이프라인의 각 단계를 생성하고 생성되면 전체 파이프라인을 실행하는 방법을 안내합니다.

Amazon SageMaker Pipelines 및 Amazon SageMaker SDK PlatoBlockchain Data Intelligence의 새로운 기능. 수직 검색. 일체 포함.

프로젝트 구조

프로젝트 구조부터 시작하겠습니다.

  • /sm-파이프라인-엔드-투-엔드-예 – 프로젝트 이름
    • /데이터 – 데이터 세트
    • /파이프라인 – 파이프라인 구성 요소에 대한 코드 파일
      • /고객 이탈
        • 전처리.py
        • 평가하다.py
    • 세이지메이커-파이프라인-project.ipynb – Pipelines의 새로운 기능을 사용하여 모델링 워크플로를 안내하는 노트북

데이터세트 다운로드

이 게시물을 따라 하려면 다운로드하고 저장해야 합니다. 샘플 데이터 세트 파일을 저장하는 프로젝트 홈 디렉토리 내의 데이터 폴더 아래 아마존 탄성 파일 시스템 (Amazon EFS) Studio 환경 내.

파이프라인 구성 요소 빌드

이제 파이프라인 구성 요소를 빌드할 준비가 되었습니다.

문 가져오기 및 매개변수 및 상수 선언

라는 Studio 노트북을 만듭니다. sagemaker-pipelines-project.ipynb 프로젝트 홈 디렉토리 내. 셀에 다음 코드 블록을 입력하고 셀을 실행하여 SageMaker 및 S3 클라이언트 개체를 설정하고 PipelineSession, SageMaker 세션과 함께 제공되는 기본 버킷을 사용하여 S3 버킷 위치를 설정합니다.

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

파이프라인은 파이프라인 코드를 변경하지 않고 런타임에 입력 매개변수를 지정할 수 있는 매개변수화를 지원합니다. 아래에서 사용 가능한 모듈을 사용할 수 있습니다. sagemaker.workflow.parameters 와 같은 모듈 ParameterInteger, ParameterFloatParameterString, 다양한 데이터 유형의 파이프라인 매개변수를 지정합니다. 다음 코드를 실행하여 여러 입력 매개변수를 설정합니다.

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

배치 데이터세트 생성

나중에 일괄 변환 단계에서 사용할 일괄 데이터세트를 생성합니다.

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

S3 버킷에 데이터 업로드

데이터 세트를 Amazon S3에 업로드합니다.

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

처리 스크립트 및 처리 단계 정의

이 단계에서는 기능 엔지니어링, 하나의 핫 인코딩을 수행하고 모델 구축에 사용할 교육, 검증 및 테스트 분할을 선별하기 위해 Python 스크립트를 준비합니다. 다음 코드를 실행하여 처리 스크립트를 빌드합니다.

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

다음으로 다음 코드 블록을 실행하여 프로세서를 인스턴스화하고 파이프라인 단계를 실행하여 처리 스크립트를 실행합니다. 처리 스크립트는 Pandas로 작성되었기 때문에 SKLearn 프로세서. 파이프라인 ProcessingStep 함수는 프로세서, 원시 데이터 세트의 입력 S3 위치, 처리된 데이터 세트를 저장할 출력 S3 위치의 인수를 사용합니다.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

훈련 단계 정의

SageMaker XGBoost 추정기 및 파이프라인을 사용하여 모델 교육 설정 TrainingStep 기능:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

평가 스크립트 및 모델 평가 단계 정의

다음 코드 블록을 실행하여 학습된 모델을 평가합니다. 이 스크립트는 AUC 점수가 지정된 임계값을 충족하는지 확인하는 논리를 캡슐화합니다.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

다음으로 다음 코드 블록을 실행하여 프로세서를 인스턴스화하고 파이프라인 단계를 실행하여 평가 스크립트를 실행합니다. 평가 스크립트는 XGBoost 패키지를 사용하기 때문에 ScriptProcessor XGBoost 이미지와 함께. 파이프라인 ProcessingStep 함수는 프로세서, 원시 데이터 세트의 입력 S3 위치, 처리된 데이터 세트를 저장할 출력 S3 위치의 인수를 사용합니다.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

모델 생성 단계 정의

파이프라인 모델 단계를 사용하여 SageMaker 모델을 생성하려면 다음 코드 블록을 실행하십시오. 이 단계에서는 교육 단계의 출력을 활용하여 배포할 모델을 패키징합니다. 인스턴스 유형 인수의 값은 이전 게시물에서 정의한 Pipelines 매개변수를 사용하여 전달됩니다.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

일괄 변환 단계 정의

다음 코드 블록을 실행하여 첫 번째 단계에서 생성된 배치 입력으로 훈련된 모델을 사용하여 배치 변환을 실행합니다.

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

모델 등록 단계 정의

다음 코드는 파이프라인 모델 단계를 사용하여 SageMaker 모델 레지스트리 내에 모델을 등록합니다.

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

파이프라인을 중지하는 실패 단계 정의

다음 코드는 AUC 점수가 정의된 임계값을 충족하지 않는 경우 오류 메시지와 함께 파이프라인 실행을 중지하는 파이프라인 실패 단계를 정의합니다.

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

AUC 점수 확인을 위한 조건 단계 정의

다음 코드는 AUC 점수를 확인하고 조건부로 모델을 생성하고 일괄 변환을 실행하고 모델 레지스트리에 모델을 등록하거나 실패한 상태에서 파이프라인 실행을 중지하는 조건 단계를 정의합니다.

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

파이프라인 빌드 및 실행

모든 구성 요소 단계를 정의한 후 이를 Pipelines 개체로 어셈블할 수 있습니다. 파이프라인은 단계 간의 종속성을 기반으로 순서 순서를 자동으로 유추하므로 파이프라인의 순서를 지정할 필요가 없습니다.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

노트북의 셀에서 다음 코드를 실행합니다. 파이프라인이 이미 있는 경우 코드는 파이프라인을 업데이트합니다. 파이프라인이 없으면 새 파이프라인을 만듭니다.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

결론

이 게시물에서 우리는 다른 내장 SageMaker 기능 및 XGBoost 알고리즘과 함께 파이프라인에서 사용할 수 있는 몇 가지 새로운 기능을 소개하여 이탈 예측을 위한 모델을 개발, 반복 및 배포했습니다. 추가 데이터 소스로 솔루션 확장 가능

자신의 ML 워크플로를 구현합니다. 파이프라인 워크플로에서 사용할 수 있는 단계에 대한 자세한 내용은 Amazon SageMaker 모델 구축 파이프라인SageMaker 워크플로. 그만큼 AWS SageMaker 예제 GitHub 리포지토리에는 파이프라인을 사용하는 다양한 사용 사례에 대한 더 많은 예제가 있습니다.


저자에 관하여

Amazon SageMaker Pipelines 및 Amazon SageMaker SDK PlatoBlockchain Data Intelligence의 새로운 기능. 수직 검색. 일체 포함.제리 펭 AWS SageMaker의 소프트웨어 개발 엔지니어입니다. 그는 교육에서 프로덕션의 모델 모니터링에 이르기까지 종단 간 대규모 MLOps 시스템을 구축하는 데 중점을 둡니다. 그는 또한 더 많은 청중에게 MLOps의 개념을 제공하는 데 열정적입니다.

Amazon SageMaker Pipelines 및 Amazon SageMaker SDK PlatoBlockchain Data Intelligence의 새로운 기능. 수직 검색. 일체 포함.드웬 치 AWS의 소프트웨어 개발 엔지니어입니다. 그녀는 현재 SageMaker 파이프라인 개발 및 개선에 중점을 두고 있습니다. 일 외에는 첼로 연습을 즐깁니다.

Amazon SageMaker Pipelines 및 Amazon SageMaker SDK PlatoBlockchain Data Intelligence의 새로운 기능. 수직 검색. 일체 포함.가야트리 가나코타 AWS Professional Services의 선임 기계 학습 엔지니어입니다. 그녀는 다양한 도메인에서 AI/ML 솔루션을 개발, 배포 및 설명하는 데 열정적입니다. 이 역할 이전에 그녀는 금융 및 소매 분야의 최고 글로벌 기업에서 데이터 과학자 및 ML 엔지니어로 여러 이니셔티브를 주도했습니다. 그녀는 볼더에 있는 콜로라도 대학교에서 데이터 과학 전문 컴퓨터 과학 석사 학위를 취득했습니다.

Amazon SageMaker Pipelines 및 Amazon SageMaker SDK PlatoBlockchain Data Intelligence의 새로운 기능. 수직 검색. 일체 포함.루피더 그레왈 AWS의 Sr Ai/ML 전문가 솔루션 아키텍트입니다. 그는 현재 SageMaker에서 모델 및 MLOps 서비스에 중점을 두고 있습니다. 이 역할을 하기 전에는 머신 러닝 엔지니어로 모델을 구축하고 호스팅하는 일을 했습니다. 일 외에는 테니스를 치고 산길에서 자전거를 타는 것을 즐깁니다.

Amazon SageMaker Pipelines 및 Amazon SageMaker SDK PlatoBlockchain Data Intelligence의 새로운 기능. 수직 검색. 일체 포함.레이 리 AWS Professional Services의 선임 데이터 과학자입니다. 그의 전문 분야는 스타트업에서 기업 조직에 이르기까지 다양한 규모의 고객을 위한 AI/ML 솔루션 구축 및 운영에 중점을 두고 있습니다. Ray는 일 외에 운동과 여행을 즐깁니다.

타임 스탬프 :

더보기 AWS 기계 학습