Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.

Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK

Đường ống Amazon SageMaker cho phép các nhà khoa học dữ liệu và kỹ sư máy học (ML) tự động hóa quy trình đào tạo, giúp bạn tạo một quy trình có thể lặp lại để sắp xếp các bước phát triển mô hình nhằm thử nghiệm nhanh chóng và đào tạo lại mô hình. Bạn có thể tự động hóa toàn bộ quy trình xây dựng mô hình, bao gồm chuẩn bị dữ liệu, kỹ thuật tính năng, đào tạo mô hình, điều chỉnh mô hình và xác thực mô hình, đồng thời lập danh mục trong sổ đăng ký mô hình. Bạn có thể định cấu hình quy trình để chạy tự động theo định kỳ hoặc khi một số sự kiện nhất định được kích hoạt hoặc bạn có thể chạy chúng theo cách thủ công nếu cần.

Trong bài đăng này, chúng tôi nhấn mạnh một số cải tiến đối với Amazon SageMaker SDK và giới thiệu các tính năng mới của Amazon SageMaker Pipelines giúp những người thực hành ML xây dựng và đào tạo các mô hình ML dễ dàng hơn.

Pipelines tiếp tục đổi mới trải nghiệm dành cho nhà phát triển của mình và với những bản phát hành gần đây này, giờ đây bạn có thể sử dụng dịch vụ theo cách tùy chỉnh hơn:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Cập nhật tài liệu về PipelineVariable cách sử dụng cho các lớp cơ sở ước tính, bộ xử lý, bộ điều chỉnh, máy biến áp và mô hình, mô hình Amazon và mô hình khung. Sẽ có thêm những thay đổi đi kèm với các phiên bản SDK mới hơn để hỗ trợ tất cả các lớp con của bộ ước tính và bộ xử lý.
  • 2.90.0 – Sự sẵn có của Mô hìnhBước cho các nhiệm vụ đăng ký và tạo tài nguyên mô hình tích hợp.
  • 2.88.2 – Sự sẵn có của Phiên giao dịch đường ống để tương tác được quản lý với các thực thể và tài nguyên SageMaker.
  • 2.88.2 – Khả năng tương thích của lớp con các bước công việc của quy trình công việc để bạn có thể xây dựng các bản tóm tắt công việc, đặt cấu hình và chạy các công việc xử lý, đào tạo, chuyển đổi và điều chỉnh như bạn muốn mà không cần quy trình.
  • 2.76.0 – Sự sẵn có của Bước thất bại để dừng có điều kiện một đường ống có trạng thái lỗi.

Trong bài đăng này, chúng tôi sẽ hướng dẫn bạn quy trình làm việc bằng cách sử dụng tập dữ liệu mẫu tập trung vào việc xây dựng và triển khai mô hình để trình bày cách triển khai các tính năng mới của Pipelines. Cuối cùng, bạn sẽ có đủ thông tin để sử dụng thành công các tính năng mới hơn này và đơn giản hóa khối lượng công việc ML của mình.

Tổng quan về tính năng

Pipelines cung cấp các tính năng mới sau:

  • Chú thích biến đường ống – Một số tham số phương thức nhất định chấp nhận nhiều loại đầu vào, bao gồm PipelineVariables, và tài liệu bổ sung đã được thêm vào để làm rõ nơi PipelineVariables được hỗ trợ trong cả phiên bản ổn định mới nhất của tài liệu SageMaker SDK và chữ ký ban đầu của các hàm. Ví dụ: trong công cụ ước tính TensorFlow sau đây, chữ ký init hiện cho thấy rằng model_dirimage_uri hỗ trợ PipelineVariables, trong khi các tham số khác thì không. Để biết thêm thông tin, hãy tham khảo Công cụ ước tính TensorFlow.
    • Trước:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Sau:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Phiên đường ốngPhiên giao dịch đường ống là một khái niệm mới được giới thiệu nhằm mang lại sự thống nhất trên SDK SageMaker và giới thiệu tính năng khởi tạo từng phần của tài nguyên quy trình (các lệnh gọi chạy được ghi lại nhưng không chạy cho đến khi quy trình được tạo và chạy). Các PipelineSession bối cảnh kế thừa SageMakerSession và triển khai các phương pháp thuận tiện để bạn tương tác với các thực thể và tài nguyên SageMaker khác, chẳng hạn như công việc đào tạo, điểm cuối và tập dữ liệu đầu vào được lưu trữ trong Dịch vụ lưu trữ đơn giản của Amazon (Amazon S3).
  • Khả năng tương thích của lớp con với các bước công việc của quy trình công việc – Giờ đây, bạn có thể xây dựng các bản tóm tắt công việc, đặt cấu hình và chạy các công việc xử lý, đào tạo, chuyển đổi và điều chỉnh như bạn muốn mà không cần quy trình.
    • Ví dụ: tạo một bước xử lý với SKLearnProcessor trước đây yêu cầu như sau:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Như chúng ta thấy trong đoạn mã trước, ProcessingStep về cơ bản cần thực hiện logic tiền xử lý giống như .run, chỉ cần không thực hiện lệnh gọi API để bắt đầu công việc. Nhưng với khả năng tương thích của lớp con hiện được kích hoạt với các bước công việc của quy trình công việc, chúng tôi khai báo step_args đối số sử dụng logic tiền xử lý với .run để bạn có thể xây dựng bản tóm tắt công việc và định cấu hình nó như cách bạn sử dụng nó mà không cần Đường ống. Chúng tôi cũng vượt qua trong pipeline_session, Mà là một PipelineSession đối tượng, thay vì sagemaker_session để đảm bảo các cuộc gọi đang chạy được ghi lại nhưng không được gọi cho đến khi đường dẫn được tạo và chạy. Xem đoạn mã sau:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Bước mô hình (một cách tiếp cận hợp lý với các bước tạo và đăng ký mô hình) –Pipelines cung cấp hai loại bước để tích hợp với mô hình SageMaker: CreateModelStepRegisterModel. Bây giờ bạn có thể đạt được cả hai chỉ bằng cách sử dụng ModelStep kiểu. Lưu ý rằng một PipelineSession là cần thiết để đạt được điều này. Điều này mang lại sự tương đồng giữa các bước quy trình và SDK.
    • Trước:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Sau:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Bước thất bại (dừng có điều kiện khi chạy đường ống)FailStep cho phép dừng quy trình với trạng thái lỗi nếu đáp ứng một điều kiện, chẳng hạn như nếu điểm mô hình dưới một ngưỡng nhất định.

Tổng quan về giải pháp

Trong giải pháp này, điểm vào của bạn là Xưởng sản xuất Amazon SageMaker môi trường phát triển tích hợp (IDE) để thử nghiệm nhanh chóng. Studio cung cấp một môi trường để quản lý trải nghiệm Pipelines từ đầu đến cuối. Với Studio, bạn có thể bỏ qua Bảng điều khiển quản lý AWS để quản lý toàn bộ quy trình làm việc của bạn. Để biết thêm thông tin về cách quản lý Quy trình từ bên trong Studio, hãy tham khảo Xem, theo dõi và thực hiện các đường ống dẫn của SageMaker trong SageMaker Studio.

Sơ đồ sau minh họa kiến ​​trúc cấp cao của quy trình làm việc ML với các bước khác nhau để huấn luyện và tạo ra các suy luận bằng cách sử dụng các tính năng mới.

Đường ống bao gồm các bước sau:

  1. Xử lý trước dữ liệu để xây dựng các tính năng cần thiết và phân chia dữ liệu thành các tập dữ liệu huấn luyện, xác thực và kiểm tra.
  2. Tạo công việc đào tạo với khung SageMaker XGBoost.
  3. Đánh giá mô hình được đào tạo bằng cách sử dụng tập dữ liệu thử nghiệm.
  4. Kiểm tra xem điểm AUC có cao hơn ngưỡng xác định trước không.
    • Nếu điểm AUC nhỏ hơn ngưỡng, hãy dừng quá trình chạy quy trình và đánh dấu là không thành công.
    • Nếu điểm AUC lớn hơn ngưỡng, hãy tạo mô hình SageMaker và đăng ký mô hình đó trong sổ đăng ký mô hình SageMaker.
  5. Áp dụng chuyển đổi hàng loạt trên tập dữ liệu đã cho bằng cách sử dụng mô hình đã tạo ở bước trước.

Điều kiện tiên quyết

Để theo dõi bài đăng này, bạn cần có tài khoản AWS có địa chỉ Miền studio.

Pipelines được tích hợp trực tiếp với các thực thể và tài nguyên của SageMaker, do đó bạn không cần phải tương tác với bất kỳ dịch vụ AWS nào khác. Bạn cũng không cần quản lý bất kỳ tài nguyên nào vì đây là dịch vụ được quản lý hoàn toàn, nghĩa là dịch vụ này tạo và quản lý tài nguyên cho bạn. Để biết thêm thông tin về các thành phần SageMaker khác nhau, cả hai đều là API Python độc lập và các thành phần tích hợp của Studio, hãy xem phần Trang sản phẩm SageMaker.

Trước khi bắt đầu, hãy cài đặt phiên bản SageMaker SDK >= 2.104.0 và xlrd >=1.0.0 trong sổ tay Studio bằng đoạn mã sau:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Quy trình làm việc ML

Đối với bài đăng này, bạn sử dụng các thành phần sau:

  • Chuẩn bị dữ liệu
    • Xử lý SageMaker – SageMaker Treatment là dịch vụ được quản lý hoàn toàn cho phép bạn chạy các chuyển đổi dữ liệu tùy chỉnh và kỹ thuật tính năng cho khối lượng công việc ML.
  • Xây dựng mô hình
  • Đào tạo và đánh giá mô hình
    • Đào tạo bằng một cú nhấp chuột – Tính năng đào tạo phân tán của SageMaker. SageMaker cung cấp các thư viện đào tạo phân tán về song song dữ liệu và song song mô hình. Các thư viện được tối ưu hóa cho môi trường đào tạo SageMaker, giúp điều chỉnh các công việc đào tạo được phân phối của bạn cho phù hợp với SageMaker, đồng thời cải thiện tốc độ và thông lượng đào tạo.
    • Thử nghiệm SageMaker – Thử nghiệm là một khả năng của SageMaker cho phép bạn sắp xếp, theo dõi, so sánh và đánh giá các lần lặp ML của mình.
    • Chuyển đổi hàng loạt SageMaker – Chuyển đổi hàng loạt hoặc tính điểm ngoại tuyến là một dịch vụ được quản lý trong SageMaker cho phép bạn dự đoán trên tập dữ liệu lớn hơn bằng mô hình ML của mình.
  • Điều phối quy trình làm việc

Quy trình SageMaker là một chuỗi các bước được kết nối với nhau được xác định theo định nghĩa quy trình JSON. Nó mã hóa một đường ống bằng cách sử dụng biểu đồ tuần hoàn có hướng (DAG). DAG cung cấp thông tin về các yêu cầu và mối quan hệ giữa từng bước của quy trình và cấu trúc của nó được xác định bởi sự phụ thuộc dữ liệu giữa các bước. Những phần phụ thuộc này được tạo khi các thuộc tính đầu ra của một bước được chuyển làm đầu vào cho bước khác.

Sơ đồ sau đây minh họa các bước khác nhau trong quy trình SageMaker (đối với trường hợp sử dụng dự đoán tỷ lệ rời bỏ), trong đó các kết nối giữa các bước được SageMaker suy ra dựa trên đầu vào và đầu ra được xác định bởi định nghĩa bước.

Các phần tiếp theo hướng dẫn cách tạo từng bước của quy trình và chạy toàn bộ quy trình sau khi được tạo.

Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.

Cấu trúc dự án

Hãy bắt đầu với cấu trúc dự án:

  • /sm-pipelines-end-to-end-example – Tên dự án
    • /dữ liệu – Các bộ dữ liệu
    • / đường ống – Các file mã cho các thành phần đường ống
      • /khách hàngchurn
        • tiền xử lý.py
        • đánh giá
    • sagemaker-pipelines-project.ipynb – Một cuốn sổ tay hướng dẫn quy trình lập mô hình bằng cách sử dụng các tính năng mới của Pipelines

Tải xuống tập dữ liệu

Để theo dõi bài đăng này, bạn cần tải xuống và lưu tập dữ liệu mẫu trong thư mục dữ liệu trong thư mục chính của dự án, nơi lưu tệp vào Hệ thống tệp đàn hồi Amazon (Amazon EFS) trong môi trường Studio.

Xây dựng các thành phần đường ống

Bây giờ bạn đã sẵn sàng xây dựng các thành phần đường ống.

Nhập các câu lệnh và khai báo các tham số và hằng số

Tạo sổ ghi chép Studio có tên sagemaker-pipelines-project.ipynb trong thư mục chính của dự án. Nhập khối mã sau vào một ô và chạy ô đó để thiết lập các đối tượng máy khách SageMaker và S3, tạo PipelineSessionvà thiết lập vị trí vùng lưu trữ S3 bằng cách sử dụng vùng lưu trữ mặc định đi kèm với phiên SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Đường ống hỗ trợ tham số hóa, cho phép bạn chỉ định các tham số đầu vào trong thời gian chạy mà không cần thay đổi mã đường ống. Bạn có thể sử dụng các mô-đun có sẵn trong sagemaker.workflow.parameters mô-đun, chẳng hạn như ParameterInteger, ParameterFloatParameterString, để chỉ định các tham số đường ống của các loại dữ liệu khác nhau. Chạy đoạn mã sau để thiết lập nhiều tham số đầu vào:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Tạo một tập dữ liệu hàng loạt

Tạo tập dữ liệu hàng loạt mà bạn sử dụng sau này trong bước chuyển đổi hàng loạt:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Tải dữ liệu lên nhóm S3

Tải bộ dữ liệu lên Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Xác định tập lệnh xử lý và bước xử lý

Trong bước này, bạn chuẩn bị một tập lệnh Python để thực hiện kỹ thuật tính năng, một mã hóa nóng và quản lý các phần tách đào tạo, xác thực và kiểm tra sẽ được sử dụng để xây dựng mô hình. Chạy đoạn mã sau để xây dựng tập lệnh xử lý của bạn:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Tiếp theo, chạy khối mã sau để khởi tạo bộ xử lý và bước Đường ống để chạy tập lệnh xử lý. Vì tập lệnh xử lý được viết bằng Pandas nên bạn sử dụng SKLearnBộ xử lý. Các đường ống ProcessingStep Hàm nhận các đối số sau: bộ xử lý, vị trí S3 đầu vào cho tập dữ liệu thô và vị trí S3 đầu ra để lưu tập dữ liệu đã xử lý.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Xác định bước đào tạo

Thiết lập đào tạo mô hình bằng công cụ ước tính SageMaker XGBoost và Quy trình TrainingStep chức năng:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Xác định kịch bản đánh giá và bước đánh giá mô hình

Chạy khối mã sau để đánh giá mô hình sau khi được đào tạo. Tập lệnh này gói gọn logic để kiểm tra xem điểm AUC có đáp ứng ngưỡng đã chỉ định hay không.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Tiếp theo, chạy khối mã sau để khởi tạo bộ xử lý và bước Quy trình để chạy tập lệnh đánh giá. Vì tập lệnh đánh giá sử dụng gói XGBoost nên bạn sử dụng ScriptProcessor cùng với hình ảnh XGBoost. Đường ống ProcessingStep Hàm nhận các đối số sau: bộ xử lý, vị trí S3 đầu vào cho tập dữ liệu thô và vị trí S3 đầu ra để lưu tập dữ liệu đã xử lý.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Xác định bước tạo mô hình

Chạy khối mã sau để tạo mô hình SageMaker bằng bước mô hình Đường ống. Bước này sử dụng đầu ra của bước huấn luyện để đóng gói mô hình để triển khai. Lưu ý rằng giá trị cho đối số loại phiên bản được truyền bằng tham số Pipelines mà bạn đã xác định trước đó trong bài viết.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Xác định bước chuyển đổi hàng loạt

Chạy khối mã sau để chạy chuyển đổi hàng loạt bằng cách sử dụng mô hình được đào tạo với đầu vào hàng loạt được tạo ở bước đầu tiên:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Xác định bước mô hình đăng ký

Đoạn mã sau đăng ký mô hình trong sổ đăng ký mô hình SageMaker bằng bước mô hình Đường ống:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Xác định bước thất bại để dừng đường ống

Mã sau đây xác định bước không thành công của Quy trình để dừng quá trình chạy quy trình kèm theo thông báo lỗi nếu điểm AUC không đáp ứng ngưỡng đã xác định:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Xác định bước điều kiện để kiểm tra điểm AUC

Mã sau đây xác định bước điều kiện để kiểm tra điểm AUC và tạo mô hình có điều kiện và chạy chuyển đổi hàng loạt cũng như đăng ký mô hình trong sổ đăng ký mô hình hoặc dừng quy trình chạy ở trạng thái không thành công:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Xây dựng và vận hành đường ống

Sau khi xác định tất cả các bước thành phần, bạn có thể tập hợp chúng thành đối tượng Pipelines. Bạn không cần chỉ định thứ tự của quy trình vì Pipelines tự động suy ra trình tự thứ tự dựa trên sự phụ thuộc giữa các bước.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Chạy đoạn mã sau vào một ô trong sổ ghi chép của bạn. Nếu đường dẫn đã tồn tại, mã sẽ cập nhật đường dẫn. Nếu đường ống không tồn tại, nó sẽ tạo một đường ống mới.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Kết luận

Trong bài đăng này, chúng tôi đã giới thiệu một số tính năng mới hiện có sẵn với Pipelines cùng với các tính năng SageMaker tích hợp khác và thuật toán XGBoost để phát triển, lặp lại và triển khai mô hình dự đoán tỷ lệ rời bỏ. Giải pháp có thể được mở rộng với các nguồn dữ liệu bổ sung

để triển khai quy trình làm việc ML của riêng bạn. Để biết thêm chi tiết về các bước có sẵn trong quy trình làm việc của Quy trình, hãy tham khảo Đường ống xây dựng mô hình Amazon SageMakerQuy trình làm việc của SageMaker. Các Ví dụ về AWS SageMaker GitHub repo có nhiều ví dụ hơn về các trường hợp sử dụng khác nhau bằng Pipelines.


Về các tác giả

Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Jerry Bành là kỹ sư phát triển phần mềm của AWS SageMaker. Anh tập trung xây dựng hệ thống MLOps quy mô lớn từ đầu đến cuối từ đào tạo đến giám sát mô hình trong sản xuất. Anh ấy cũng đam mê đưa khái niệm MLOps đến với nhiều đối tượng hơn.

Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Đức Văn Kỳ là Kỹ sư phát triển phần mềm tại AWS. Cô hiện tập trung vào việc phát triển và cải tiến SageMaker Pipelines. Ngoài công việc, cô thích tập đàn Cello.

Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Gayatri Ghanakota là Kỹ sư Máy học Sơ cấp với Dịch vụ Chuyên nghiệp AWS. Cô ấy đam mê phát triển, triển khai và giải thích các giải pháp AI / ML trên nhiều lĩnh vực khác nhau. Trước khi đảm nhận vai trò này, bà đã lãnh đạo nhiều sáng kiến ​​với tư cách là nhà khoa học dữ liệu và kỹ sư ML với các công ty toàn cầu hàng đầu trong lĩnh vực tài chính và bán lẻ. Cô có bằng thạc sĩ về Khoa học máy tính chuyên ngành Khoa học dữ liệu tại Đại học Colorado, Boulder.

Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Rupinder Grewal là Kiến trúc sư Giải pháp Chuyên gia Sr Ai / ML của AWS. Anh ấy hiện đang tập trung vào việc phục vụ người mẫu và MLOps trên SageMaker. Trước khi đảm nhiệm vai trò này, anh ấy đã từng làm việc với tư cách là Kỹ sư máy học xây dựng và lưu trữ các mô hình. Ngoài giờ làm việc, anh ấy thích chơi tennis và đạp xe trên những con đường mòn trên núi.

Các tính năng mới dành cho Amazon SageMaker Pipelines và Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Ray Li là Nhà khoa học dữ liệu cấp cao với Dịch vụ chuyên nghiệp của AWS. Chuyên môn của ông tập trung vào việc xây dựng và vận hành các giải pháp AI/ML cho khách hàng thuộc nhiều quy mô khác nhau, từ các công ty khởi nghiệp đến các tổ chức doanh nghiệp. Ngoài công việc, Ray thích tập thể dục và đi du lịch.

Dấu thời gian:

Thêm từ Học máy AWS