ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. جستجوی عمودی Ai.

ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK

خطوط لوله آمازون SageMaker به دانشمندان داده و مهندسان یادگیری ماشین (ML) اجازه می‌دهد گردش‌های کاری آموزشی را خودکار کنند، که به شما کمک می‌کند فرآیندی تکرارپذیر برای هماهنگ کردن مراحل توسعه مدل برای آزمایش سریع و بازآموزی مدل ایجاد کنید. می‌توانید کل گردش کار ساخت مدل، از جمله آماده‌سازی داده‌ها، مهندسی ویژگی‌ها، آموزش مدل، تنظیم مدل، و اعتبارسنجی مدل را خودکار کنید و آن را در رجیستری مدل فهرست کنید. می‌توانید خطوط لوله را به گونه‌ای پیکربندی کنید که در فواصل زمانی منظم یا زمانی که رویدادهای خاصی فعال می‌شوند، به طور خودکار اجرا شوند، یا می‌توانید در صورت نیاز آنها را به صورت دستی اجرا کنید.

در این پست، برخی از پیشرفت‌ها را برجسته می‌کنیم آمازون SageMaker SDK و معرفی ویژگی‌های جدید Amazon SageMaker Pipelines که ساخت و آموزش مدل‌های ML را برای پزشکان ML آسان‌تر می‌کند.

Pipelines به نوآوری در تجربه توسعه دهندگان خود ادامه می دهد و با این نسخه های اخیر، اکنون می توانید از این سرویس به روشی سفارشی تر استفاده کنید:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 - اسناد به روز شده در PipelineVariable استفاده برای برآوردگر، پردازنده، تیونر، ترانسفورماتور، و کلاس های پایه مدل، مدل های آمازون، و مدل های چارچوب. تغییرات بیشتری با نسخه‌های جدیدتر SDK برای پشتیبانی از همه زیر کلاس‌های برآوردگرها و پردازنده‌ها وجود خواهد داشت.
  • 2.90.0 - در دسترس بودن ModelStep برای ایجاد منابع مدل یکپارچه و وظایف ثبت نام.
  • 2.88.2 - در دسترس بودن Pipeline Session برای تعامل مدیریت شده با نهادها و منابع SageMaker.
  • 2.88.2 - سازگاری زیر کلاس برای مراحل کار خط لوله گردش کار بنابراین می‌توانید انتزاع‌های شغلی بسازید و کارهای پردازش، آموزش، تبدیل، و تنظیم را پیکربندی و اجرا کنید، همانطور که بدون خط لوله انجام می‌دهید.
  • 2.76.0 - در دسترس بودن FailStep برای توقف مشروط یک خط لوله با وضعیت خرابی.

در این پست، با استفاده از یک مجموعه داده نمونه با تمرکز بر ساخت و استقرار مدل، شما را از طریق یک گردش کار راهنمایی می‌کنیم تا نحوه پیاده‌سازی ویژگی‌های جدید Pipelines را نشان دهیم. در پایان، باید اطلاعات کافی برای استفاده موفقیت‌آمیز از این ویژگی‌های جدیدتر و ساده‌سازی حجم کاری ML خود داشته باشید.

مشخصات کلی

Pipelines ویژگی های جدید زیر را ارائه می دهد:

  • حاشیه نویسی متغیر خط لوله - پارامترهای روش خاصی، انواع ورودی را می پذیرند، از جمله PipelineVariables، و اسناد اضافی برای روشن شدن مکان اضافه شده است PipelineVariables هم در آخرین نسخه پایدار اسناد SageMaker SDK و هم در امضای اولیه توابع پشتیبانی می شوند. به عنوان مثال، در برآوردگر TensorFlow زیر، امضای init اکنون این را نشان می دهد model_dir و image_uri پشتیبانی PipelineVariables، در حالی که سایر پارامترها اینطور نیستند. برای اطلاعات بیشتر مراجعه کنید تخمینگر TensorFlow.
    • قبل از:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • بعد از:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • جلسه خط لوله - Pipeline Session مفهوم جدیدی است که برای ایجاد وحدت در سراسر SageMaker SDK معرفی شده است و مقدار دهی اولیه منابع خط لوله را معرفی می کند (تماس های اجرا گرفته می شوند اما تا زمانی که خط لوله ایجاد و اجرا نشود اجرا نمی شوند). این PipelineSession زمینه به ارث می برد SageMakerSession و روش‌های مناسبی را برای تعامل با سایر نهادها و منابع SageMaker، مانند مشاغل آموزشی، نقاط پایانی، و مجموعه داده‌های ورودی ذخیره شده در سرویس ذخیره سازی ساده آمازون (Amazon S3).
  • سازگاری زیر کلاس با مراحل کار خط لوله گردش کار – اکنون می‌توانید انتزاع‌های شغلی بسازید و کارهای پردازش، آموزش، تبدیل و تنظیم را بدون خط لوله پیکربندی و اجرا کنید.
    • به عنوان مثال، ایجاد مرحله پردازش با SKLearnProcessor قبلاً موارد زیر مورد نیاز بود:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • همانطور که در کد قبل می بینیم، ProcessingStep باید اساساً همان منطق پیش پردازش را انجام دهد .run، فقط بدون شروع فراخوانی API برای شروع کار. اما با سازگاری زیر کلاس که اکنون با مراحل کار خط لوله گردش کار فعال شده است، ما این را اعلام می کنیم step_args آرگومان که منطق پیش پردازش را با .run می گیرد تا بتوانید یک انتزاع کار بسازید و آن را همانطور که بدون Pipelines استفاده می کنید پیکربندی کنید. ما نیز در عبور می کنیم pipeline_session، که است PipelineSession شیء، به جای sagemaker_session تا مطمئن شوید که فراخوان‌های اجرا گرفته می‌شوند اما تا زمانی که خط لوله ایجاد و اجرا نشود، فراخوانی نمی‌شود. کد زیر را ببینید:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • مرحله مدل (رویکردی ساده با مراحل ایجاد و ثبت مدل) -Pipelines دو نوع مرحله ای را برای ادغام با مدل های SageMaker ارائه می دهد: CreateModelStep و RegisterModel. اکنون می توانید هر دو را فقط با استفاده از ModelStep نوع توجه داشته باشید که الف PipelineSession برای دستیابی به این امر لازم است. این شباهت بین مراحل خط لوله و SDK به ارمغان می آورد.
    • قبل از:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • بعد از:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • مرحله شکست (توقف مشروط اجرای خط لوله) - FailStep اجازه می دهد تا یک خط لوله با وضعیت خرابی در صورت رعایت یک شرط متوقف شود، مثلاً اگر امتیاز مدل زیر یک آستانه خاص باشد.

بررسی اجمالی راه حل

در این راه حل، نقطه ورود شما است Amazon SageMaker Studio محیط توسعه یکپارچه (IDE) برای آزمایش سریع. استودیو محیطی را برای مدیریت تجربه پایان به پایان Pipelines ارائه می دهد. با استودیو، می توانید دور زدن کنسول مدیریت AWS برای کل مدیریت گردش کار شما برای اطلاعات بیشتر در مورد مدیریت خطوط لوله از داخل استودیو، مراجعه کنید مشاهده، ردیابی و اجرای خطوط لوله SageMaker در SageMaker Studio.

نمودار زیر معماری سطح بالای گردش کار ML را با مراحل مختلف آموزش و تولید استنتاج با استفاده از ویژگی های جدید نشان می دهد.

خط لوله شامل مراحل زیر است:

  1. داده‌ها را برای ساخت ویژگی‌های مورد نیاز پیش پردازش کنید و داده‌ها را به مجموعه داده‌های قطار، اعتبارسنجی و آزمایش تقسیم کنید.
  2. با چارچوب SageMaker XGBoost یک شغل آموزشی ایجاد کنید.
  3. مدل آموزش دیده را با استفاده از مجموعه داده آزمایشی ارزیابی کنید.
  4. بررسی کنید که آیا امتیاز AUC بالاتر از یک آستانه از پیش تعریف شده است.
    • اگر امتیاز AUC کمتر از آستانه باشد، اجرای خط لوله را متوقف کرده و آن را به عنوان ناموفق علامت بزنید.
    • اگر امتیاز AUC بیشتر از آستانه است، یک مدل SageMaker ایجاد کنید و آن را در رجیستری مدل SageMaker ثبت کنید.
  5. با استفاده از مدل ایجاد شده در مرحله قبل، تبدیل دسته ای را روی مجموعه داده داده شده اعمال کنید.

پیش نیازها

برای دنبال کردن این پست، به یک حساب کاربری AWS با a نیاز دارید دامنه استودیو.

Pipelines مستقیماً با نهادها و منابع SageMaker یکپارچه شده است، بنابراین شما نیازی به تعامل با سایر خدمات AWS ندارید. همچنین نیازی به مدیریت هیچ منبعی ندارید زیرا این یک سرویس کاملاً مدیریت شده است، به این معنی که منابع را برای شما ایجاد و مدیریت می کند. برای کسب اطلاعات بیشتر در مورد مولفه های مختلف SageMaker که هر دو API مستقل پایتون و اجزای یکپارچه Studio هستند، به صفحه محصول SageMaker.

قبل از شروع، SageMaker SDK نسخه >= 2.104.0 و xlrd >=1.0.0 را در نوت بوک استودیو با استفاده از قطعه کد زیر نصب کنید:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

گردش کار ML

برای این پست از اجزای زیر استفاده می کنید:

  • آماده سازی داده ها
    • پردازش SageMaker – SageMaker Processing یک سرویس کاملاً مدیریت شده است که به شما امکان می دهد تبدیل داده های سفارشی و مهندسی ویژگی را برای بارهای کاری ML اجرا کنید.
  • ساختمان مدل
  • آموزش و ارزیابی الگو
    • آموزش با یک کلیک - ویژگی آموزشی SageMaker توزیع شده است. SageMaker کتابخانه های آموزشی توزیع شده ای را برای موازی سازی داده ها و موازی سازی مدل ها فراهم می کند. کتابخانه ها برای محیط آموزشی SageMaker بهینه شده اند، به تطبیق مشاغل آموزشی توزیع شده شما با SageMaker و بهبود سرعت و توان عملیاتی آموزش کمک می کنند.
    • آزمایش های SageMaker – Experiments یک قابلیت SageMaker است که به شما امکان می دهد تکرارهای ML خود را سازماندهی، پیگیری، مقایسه و ارزیابی کنید.
    • تبدیل دسته ای SageMaker – تبدیل دسته ای یا امتیاز دهی آفلاین یک سرویس مدیریت شده در SageMaker است که به شما امکان می دهد با استفاده از مدل های ML خود، روی مجموعه داده بزرگتری پیش بینی کنید.
  • ارکستراسیون گردش کار

خط لوله SageMaker مجموعه ای از مراحل به هم پیوسته است که با تعریف خط لوله JSON تعریف شده است. این یک خط لوله را با استفاده از یک گراف غیر چرخه ای جهت دار (DAG) رمزگذاری می کند. DAG اطلاعاتی در مورد الزامات و روابط بین هر مرحله از خط لوله ارائه می دهد و ساختار آن توسط وابستگی داده ها بین مراحل تعیین می شود. این وابستگی ها زمانی ایجاد می شوند که ویژگی های خروجی یک مرحله به عنوان ورودی به مرحله دیگر ارسال شود.

نمودار زیر مراحل مختلف خط لوله SageMaker را نشان می دهد (برای یک مورد استفاده از پیش بینی انحراف) که در آن اتصالات بین مراحل توسط SageMaker بر اساس ورودی ها و خروجی های تعریف شده توسط تعاریف مرحله استنباط می شود.

بخش های بعدی از طریق ایجاد هر مرحله از خط لوله و اجرای کل خط لوله پس از ایجاد می گذرد.

ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. جستجوی عمودی Ai.

ساختار پروژه

بیایید با ساختار پروژه شروع کنیم:

  • /sm-pipelines-end-to-end-example - نام پروژه
    • /داده ها - مجموعه داده ها
    • / خطوط لوله - فایل های کد برای اجزای خط لوله
      • /ریزش مشتری
        • preprocess.py
        • ارزیابی.py
    • sagemaker-pipelines-project.ipynb – یک نوت بوک که در جریان کار مدل سازی با استفاده از ویژگی های جدید Pipelines قدم می زند

مجموعه داده را دانلود کنید

برای دنبال کردن این پست، باید آن را دانلود و ذخیره کنید مجموعه داده نمونه زیر پوشه داده در فهرست خانه پروژه، که فایل را در آن ذخیره می کند سیستم فایل الاستیک آمازون (Amazon EFS) در محیط Studio.

اجزای خط لوله را بسازید

اکنون شما آماده ساخت اجزای خط لوله هستید.

دستورات را وارد کنید و پارامترها و ثابت ها را اعلام کنید

یک نوت بوک استودیویی به نام ایجاد کنید sagemaker-pipelines-project.ipynb در فهرست خانه پروژه بلوک کد زیر را در یک سلول وارد کنید، و سلول را برای راه اندازی اشیاء کلاینت SageMaker و S3 اجرا کنید، ایجاد کنید. PipelineSessionو مکان سطل S3 را با استفاده از سطل پیش‌فرض همراه با یک جلسه SageMaker تنظیم کنید:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines از پارامترسازی پشتیبانی می کند که به شما امکان می دهد پارامترهای ورودی را در زمان اجرا بدون تغییر کد خط لوله خود مشخص کنید. می توانید از ماژول های موجود در زیر استفاده کنید sagemaker.workflow.parameters ماژول مانند ParameterInteger, ParameterFloatو ParameterString، برای تعیین پارامترهای خط لوله انواع داده های مختلف. کد زیر را برای تنظیم چند پارامتر ورودی اجرا کنید:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

یک مجموعه داده دسته ای ایجاد کنید

مجموعه داده دسته ای را ایجاد کنید که بعداً در مرحله تبدیل دسته ای استفاده می کنید:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

داده ها را در یک سطل S3 آپلود کنید

مجموعه داده ها را در آمازون S3 بارگذاری کنید:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

یک اسکریپت پردازش و مرحله پردازش را تعریف کنید

در این مرحله، شما یک اسکریپت پایتون را برای انجام مهندسی ویژگی، یک کدگذاری داغ، و تنظیم آموزش، اعتبار سنجی و تقسیم‌های آزمایشی برای استفاده در ساخت مدل آماده می‌کنید. کد زیر را برای ساخت اسکریپت پردازشی خود اجرا کنید:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

سپس بلوک کد زیر را برای نمونه سازی پردازنده و مرحله Pipelines را برای اجرای اسکریپت پردازش اجرا کنید. از آنجایی که اسکریپت پردازش در پاندا نوشته شده است، از a استفاده می کنید SKLearnProcessor. خطوط لوله ProcessingStep تابع آرگومان های زیر را می گیرد: پردازنده، مکان های ورودی S3 برای مجموعه داده های خام، و مکان های خروجی S3 برای ذخیره مجموعه داده های پردازش شده.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

یک مرحله آموزشی را تعریف کنید

آموزش مدل را با استفاده از برآوردگر SageMaker XGBoost و Pipelines تنظیم کنید TrainingStep عملکرد:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

اسکریپت ارزیابی و مرحله ارزیابی مدل را تعریف کنید

بلوک کد زیر را برای ارزیابی مدل پس از آموزش اجرا کنید. این اسکریپت منطق را محصور می کند تا بررسی کند که آیا امتیاز AUC با آستانه مشخص شده مطابقت دارد یا خیر.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

سپس بلوک کد زیر را برای نمونه سازی پردازنده و مرحله Pipelines را برای اجرای اسکریپت ارزیابی اجرا کنید. از آنجا که اسکریپت ارزیابی از بسته XGBoost استفاده می کند، شما از a استفاده می کنید ScriptProcessor به همراه تصویر XGBoost. خطوط لوله ProcessingStep تابع آرگومان های زیر را می گیرد: پردازنده، مکان های ورودی S3 برای مجموعه داده های خام، و مکان های خروجی S3 برای ذخیره مجموعه داده های پردازش شده.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

یک مرحله ایجاد مدل را تعریف کنید

بلوک کد زیر را برای ایجاد یک مدل SageMaker با استفاده از مرحله مدل Pipelines اجرا کنید. این مرحله از خروجی مرحله آموزش برای بسته بندی مدل برای استقرار استفاده می کند. توجه داشته باشید که مقدار آرگومان نوع نمونه با استفاده از پارامتر Pipelines که قبلا در پست تعریف کرده بودید ارسال می شود.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

یک مرحله تبدیل دسته ای را تعریف کنید

بلوک کد زیر را برای اجرای تبدیل دسته ای با استفاده از مدل آموزش دیده با ورودی دسته ای ایجاد شده در مرحله اول اجرا کنید:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

یک مرحله مدل ثبت را تعریف کنید

کد زیر مدل را در رجیستری مدل SageMaker با استفاده از مرحله مدل Pipelines ثبت می کند:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

یک مرحله شکست برای توقف خط لوله تعریف کنید

کد زیر مرحله شکست خطوط لوله را برای توقف اجرای خط لوله با یک پیام خطا تعریف می کند اگر امتیاز AUC آستانه تعریف شده را برآورده نکند:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

یک مرحله شرط برای بررسی امتیاز AUC تعریف کنید

کد زیر یک مرحله شرط برای بررسی امتیاز AUC و ایجاد مشروط یک مدل و اجرای یک تبدیل دسته ای و ثبت یک مدل در رجیستری مدل، یا توقف اجرای خط لوله در حالت ناموفق تعریف می کند:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

خط لوله را بسازید و اجرا کنید

پس از تعریف تمام مراحل کامپوننت، می توانید آنها را در یک شی Pipelines جمع آوری کنید. شما نیازی به تعیین ترتیب خط لوله ندارید زیرا Pipelines به طور خودکار ترتیب ترتیب را بر اساس وابستگی های بین مراحل استنباط می کند.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

کد زیر را در سلولی در نوت بوک خود اجرا کنید. اگر خط لوله از قبل وجود داشته باشد، کد خط لوله را به روز می کند. اگر خط لوله وجود نداشته باشد، خط لوله جدیدی ایجاد می کند.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

نتیجه

در این پست، برخی از ویژگی‌های جدیدی که اکنون با Pipelines در دسترس هستند را به همراه سایر ویژگی‌های داخلی SageMaker و الگوریتم XGBoost برای توسعه، تکرار و استقرار مدلی برای پیش‌بینی ریزش معرفی کردیم. راه حل را می توان با منابع داده اضافی گسترش داد

برای پیاده سازی گردش کار ML خود. برای جزئیات بیشتر در مورد مراحل موجود در گردش کار Pipelines، مراجعه کنید خط لوله ساختمان مدل SageMaker آمازون و SageMaker Workflows. نمونه های AWS SageMaker مخزن GitHub نمونه های بیشتری در مورد موارد استفاده مختلف با استفاده از Pipelines دارد.


درباره نویسنده

ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. جستجوی عمودی Ai.جری پنگ یک مهندس توسعه نرم افزار با AWS SageMaker است. او بر ساختن سیستم MLOps در مقیاس بزرگ از آموزش تا نظارت بر مدل در تولید تمرکز دارد. او همچنین مشتاق است که مفهوم MLOps را به مخاطبان بیشتری ارائه دهد.

ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. جستجوی عمودی Ai.دیون چی مهندس توسعه نرم افزار در AWS است. او در حال حاضر بر توسعه و بهبود خطوط لوله SageMaker تمرکز دارد. در خارج از محل کار، او از تمرین سلو لذت می برد.

ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. جستجوی عمودی Ai.گایاتری گاناکوتا یک مهندس یادگیری ماشین پدر با خدمات حرفه ای AWS است. او مشتاق توسعه، استقرار و توضیح راه‌حل‌های AI/ML در حوزه‌های مختلف است. قبل از این نقش، او به عنوان یک دانشمند داده و مهندس ML با شرکت های برتر جهانی در فضای مالی و خرده فروشی، ابتکارات متعددی را رهبری کرد. او دارای مدرک کارشناسی ارشد در علوم کامپیوتر با تخصص در علوم داده از دانشگاه کلرادو، بولدر است.

ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. جستجوی عمودی Ai.روپیند گروال یک معمار راه حل های تخصصی Sr Ai/ML با AWS است. او در حال حاضر روی ارائه مدل ها و MLO ها در SageMaker تمرکز دارد. قبل از این نقش، او به عنوان مهندس یادگیری ماشین، مدل‌های ساخت و میزبانی کار کرده است. خارج از محل کار او از بازی تنیس و دوچرخه سواری در مسیرهای کوهستانی لذت می برد.

ویژگی های جدید برای Amazon SageMaker Pipelines و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. جستجوی عمودی Ai.ری لی یک Sr. Scientist با خدمات حرفه ای AWS است. تخصص او بر ساخت و عملیاتی کردن راه حل های AI/ML برای مشتریان با اندازه های مختلف، از استارت آپ ها تا سازمان های سازمانی متمرکز است. خارج از محل کار، ری از تناسب اندام و مسافرت لذت می برد.

تمبر زمان:

بیشتر از آموزش ماشین AWS