ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. البحث العمودي. عاي.

ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK

خطوط أنابيب Amazon SageMaker يسمح لعلماء البيانات ومهندسي التعلم الآلي (ML) بأتمتة تدفقات سير العمل التدريبية ، مما يساعدك على إنشاء عملية قابلة للتكرار لتنظيم خطوات تطوير النموذج للتجربة السريعة وإعادة تدريب النموذج. يمكنك أتمتة سير عمل بناء النموذج بالكامل ، بما في ذلك إعداد البيانات وهندسة الميزات والتدريب على النموذج وضبط النموذج والتحقق من صحة النموذج وفهرسته في سجل النموذج. يمكنك تكوين خطوط الأنابيب للتشغيل تلقائيًا على فترات منتظمة أو عند تشغيل أحداث معينة ، أو يمكنك تشغيلها يدويًا حسب الحاجة.

في هذا المنشور ، نسلط الضوء على بعض التحسينات التي تم إدخالها على ملف الأمازون SageMaker SDK وإدخال ميزات جديدة لخطوط أنابيب Amazon SageMaker التي تسهل على ممارسي التعلم الآلي إنشاء نماذج تعلم الآلة وتدريبها.

تواصل Pipelines ابتكار تجربة المطورين الخاصة بها ، ومع هذه الإصدارات الأخيرة ، يمكنك الآن استخدام الخدمة بطريقة أكثر تخصيصًا:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 - تم تحديث الوثائق PipelineVariable استخدام المقدر والمعالج والموالف والمحول وفئات القاعدة النموذجية ونماذج Amazon ونماذج الإطار. ستكون هناك تغييرات إضافية قادمة مع الإصدارات الأحدث من SDK لدعم جميع الفئات الفرعية للمقدرات والمعالجات.
  • 2.90.0 - توافر نموذج لإنشاء موارد نموذجية متكاملة ومهام التسجيل.
  • 2.88.2 - توافر جلسة خط الأنابيب للتفاعل المُدار مع كيانات وموارد SageMaker.
  • 2.88.2 - توافق الفئة الفرعية لـ خطوات عمل خط سير العمل حتى تتمكن من بناء تجريدات الوظائف وتكوين وتشغيل المعالجة ، والتدريب ، والتحويل ، وضبط الوظائف كما تفعل بدون خط أنابيب.
  • 2.76.0 - توافر فشل الخطوة للإيقاف المشروط لخط أنابيب بحالة فشل.

في هذا المنشور ، نوجهك عبر سير عمل باستخدام عينة مجموعة بيانات مع التركيز على بناء النماذج ونشرها لتوضيح كيفية تنفيذ ميزات خطوط الأنابيب الجديدة. في النهاية ، يجب أن يكون لديك معلومات كافية لاستخدام هذه الميزات الجديدة بنجاح وتبسيط أعباء عمل ML الخاصة بك.

نظرة عامة على الميزات

تقدم خطوط الأنابيب الميزات الجديدة التالية:

  • شرح متغير خط الأنابيب - تقبل معلمات طريقة معينة أنواعًا متعددة من المدخلات ، بما في ذلك PipelineVariables، وتمت إضافة وثائق إضافية لتوضيح مكان PipelineVariables مدعومة في أحدث إصدار ثابت من وثائق SageMaker SDK والتوقيع الأولي للوظائف. على سبيل المثال ، في مقدر TensorFlow التالي ، يُظهر توقيع البادئ ذلك الآن model_dir و image_uri الدعم PipelineVariables، في حين أن المعلمات الأخرى لا تفعل ذلك. لمزيد من المعلومات ، يرجى الرجوع إلى مقدر TensorFlow.
    • قبل:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • بعد:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • جلسة خط الأنابيب - جلسة خط الأنابيب هو مفهوم جديد تم تقديمه لتحقيق الوحدة عبر SageMaker SDK ويقدم تهيئة كسولة لموارد خط الأنابيب (يتم التقاط مكالمات التشغيل ولكن لا يتم تشغيلها حتى يتم إنشاء خط الأنابيب وتشغيله). ال PipelineSession السياق يرث SageMakerSession وتنفذ طرقًا ملائمة لك للتفاعل مع كيانات وموارد SageMaker الأخرى ، مثل وظائف التدريب ونقاط النهاية ومجموعات بيانات الإدخال المخزنة في خدمة تخزين أمازون البسيطة (أمازون S3).
  • توافق الفئة الفرعية مع خطوات مهام مسار العمل - يمكنك الآن إنشاء تجريدات للوظائف وتكوين وتشغيل المعالجة ، والتدريب ، والتحويل ، وضبط الوظائف كما تفعل بدون خط أنابيب.
    • على سبيل المثال ، إنشاء خطوة معالجة باستخدام SKLearnProcessor مطلوب مسبقًا ما يلي:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • كما نرى في الكود السابق ، ProcessingStep يحتاج إلى القيام بنفس منطق المعالجة المسبقة مثل .run، فقط بدون بدء استدعاء API لبدء المهمة. ولكن مع تمكين توافق الفئة الفرعية الآن من خلال خطوات مهام مسار العمل ، فإننا نعلن عن step_args الحجة التي تأخذ منطق المعالجة المسبقة مع .run حتى تتمكن من بناء تجريد الوظيفة وتكوينه كما تستخدمه بدون خطوط الأنابيب. نمر أيضا في pipeline_session، وهو PipelineSession الكائن ، بدلاً من sagemaker_session للتأكد من التقاط مكالمات التشغيل ولكن لا يتم استدعاؤها حتى يتم إنشاء خط الأنابيب وتشغيله. انظر الكود التالي:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • خطوة النموذج (نهج مبسط مع خطوات إنشاء النموذج والتسجيل) - تقدم خطوط الأنابيب نوعين من الخطوات للتكامل مع نماذج SageMaker: CreateModelStep و RegisterModel. يمكنك الآن تحقيق كليهما باستخدام ملف ModelStep يكتب. لاحظ أن أ PipelineSession مطلوب لتحقيق ذلك. هذا يجلب التشابه بين خطوات خط الأنابيب و SDK.
    • قبل:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • بعد:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • خطوة الفشل (التوقف الشرطي لتشغيل خط الأنابيب) - FailStep يسمح بإيقاف خط الأنابيب بحالة فشل إذا تم استيفاء شرط ، مثل إذا كانت درجة النموذج أقل من عتبة معينة.

حل نظرة عامة

في هذا الحل ، نقطة الدخول الخاصة بك هي أمازون ساجميكر ستوديو بيئة التطوير المتكاملة (IDE) للتجارب السريعة. يوفر Studio بيئة لإدارة تجربة خطوط الأنابيب الشاملة. باستخدام الاستوديو ، يمكنك تجاوز ملف وحدة تحكم إدارة AWS لإدارة سير العمل بالكامل. لمزيد من المعلومات حول إدارة خطوط الأنابيب من داخل Studio ، يرجى الرجوع إلى عرض خطوط أنابيب SageMaker وتتبعها وتنفيذها في SageMaker Studio.

يوضح الرسم التخطيطي التالي البنية عالية المستوى لسير عمل ML مع الخطوات المختلفة لتدريب وإنشاء الاستنتاجات باستخدام الميزات الجديدة.

يتضمن خط الأنابيب الخطوات التالية:

  1. بيانات المعالجة المسبقة لإنشاء الميزات المطلوبة وتقسيم البيانات إلى مجموعات بيانات تدريب وتحقق من الصحة واختبار.
  2. قم بإنشاء وظيفة تدريبية باستخدام إطار عمل SageMaker XGBoost.
  3. قم بتقييم النموذج المدرب باستخدام مجموعة بيانات الاختبار.
  4. تحقق مما إذا كانت درجة AUC أعلى من الحد المحدد مسبقًا.
    • إذا كانت درجة AUC أقل من الحد الأدنى ، فقم بإيقاف تشغيل خط الأنابيب وحددها على أنها فاشلة.
    • إذا كانت درجة AUC أكبر من الحد الأدنى ، فقم بإنشاء نموذج SageMaker وقم بتسجيله في سجل نموذج SageMaker.
  5. قم بتطبيق تحويل الدُفعات على مجموعة البيانات المحددة باستخدام النموذج الذي تم إنشاؤه في الخطوة السابقة.

المتطلبات الأساسية المسبقة

لمتابعة هذا المنشور ، تحتاج إلى حساب AWS بامتداد مجال الاستوديو.

يتم دمج خطوط الأنابيب مباشرةً مع كيانات وموارد SageMaker ، لذلك لا تحتاج إلى التفاعل مع أي خدمات AWS أخرى. لا تحتاج أيضًا إلى إدارة أي موارد لأنها خدمة مُدارة بالكامل ، مما يعني أنها تنشئ الموارد وتديرها لك. لمزيد من المعلومات حول مكونات SageMaker المختلفة التي تعد كلا من واجهات برمجة تطبيقات Python المستقلة جنبًا إلى جنب مع المكونات المدمجة في Studio ، راجع صفحة منتج SageMaker.

قبل البدء ، قم بتثبيت إصدار SageMaker SDK> = 2.104.0 و xlrd> = 1.0.0 داخل دفتر Studio باستخدام مقتطف الشفرة التالي:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

سير عمل ML

في هذا المنشور ، تستخدم المكونات التالية:

  • إعداد البيانات
    • معالجة SageMaker - SageMaker Processing هي خدمة مُدارة بالكامل تتيح لك تشغيل تحويلات البيانات المخصصة وهندسة الميزات لأعباء عمل ML.
  • بناء نموذج
  • التدريب النموذجي والتقييم
    • تدريب بنقرة واحدة - ميزة التدريب الموزع SageMaker. يوفر SageMaker مكتبات تدريب موزعة لتوازي البيانات ونموذج التوازي. تم تحسين المكتبات لبيئة تدريب SageMaker ، وتساعد في تكييف وظائف التدريب الموزعة الخاصة بك مع SageMaker ، وتحسين سرعة التدريب والإنتاجية.
    • تجارب SageMaker - التجارب هي قدرة SageMaker التي تتيح لك تنظيم وتتبع ومقارنة وتقييم تكرارات ML الخاصة بك.
    • تحويل دفعة SageMaker - تحويل الدُفعات أو تسجيل النقاط دون اتصال بالإنترنت هي خدمة مُدارة في SageMaker تتيح لك التنبؤ على مجموعة بيانات أكبر باستخدام نماذج ML الخاصة بك.
  • تنسيق سير العمل

خط أنابيب SageMaker عبارة عن سلسلة من الخطوات المترابطة المحددة بواسطة تعريف خط أنابيب JSON. يشفر خط أنابيب باستخدام رسم بياني لا دوري موجه (DAG). يعطي DAG معلومات عن المتطلبات والعلاقات بين كل خطوة من خط الأنابيب ، ويتم تحديد هيكلها من خلال تبعيات البيانات بين الخطوات. يتم إنشاء هذه التبعيات عندما يتم تمرير خصائص مخرجات الخطوة كمدخل إلى خطوة أخرى.

يوضح الرسم البياني التالي الخطوات المختلفة في خط أنابيب SageMaker (لحالة استخدام التنبؤ بالتخبط) حيث يتم استنتاج الروابط بين الخطوات بواسطة SageMaker استنادًا إلى المدخلات والمخرجات المحددة بواسطة تعريفات الخطوة.

تتناول الأقسام التالية إنشاء كل خطوة من خط الأنابيب وتشغيل خط الأنابيب بالكامل بمجرد إنشائه.

ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. البحث العمودي. عاي.

هيكل المشروع

لنبدأ بهيكل المشروع:

  • / sm-pipelines-end-to-end-example - اسم المشروع
    • /البيانات - مجموعات البيانات
    • / خطوط الأنابيب - ملفات التعليمات البرمجية لمكونات خطوط الأنابيب
      • / العملاء
        • preprocess.py
        • تقييم
    • sagemaker- خطوط الأنابيب- project.ipynb - جهاز كمبيوتر محمول يتجول في سير عمل النمذجة باستخدام ميزات خطوط الأنابيب الجديدة

قم بتنزيل مجموعة البيانات

لمتابعة هذا المنشور ، تحتاج إلى تنزيل وحفظ ملف عينة مجموعة البيانات ضمن مجلد البيانات داخل الدليل الرئيسي للمشروع ، والذي يحفظ الملف بتنسيق نظام ملفات أمازون المرن (Amazon EFS) داخل بيئة الاستوديو.

بناء مكونات خط الأنابيب

أنت الآن جاهز لبناء مكونات خط الأنابيب.

استيراد البيانات وتوضيح المعلمات والثوابت

قم بإنشاء دفتر ملاحظات Studio يسمى sagemaker-pipelines-project.ipynb داخل الدليل الرئيسي للمشروع. أدخل كتلة التعليمات البرمجية التالية في خلية ، وقم بتشغيل الخلية لإعداد كائنات عميل SageMaker و S3 ، وإنشاء PipelineSession، وقم بإعداد موقع حاوية S3 باستخدام الحاوية الافتراضية التي تأتي مع جلسة SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

تدعم خطوط الأنابيب المعلمات ، والتي تسمح لك بتحديد معلمات الإدخال في وقت التشغيل دون تغيير رمز خط الأنابيب الخاص بك. يمكنك استخدام الوحدات المتاحة تحت sagemaker.workflow.parameters وحدة ، مثل ParameterInteger, ParameterFloatو ParameterString، لتحديد معلمات خط الأنابيب لأنواع البيانات المختلفة. قم بتشغيل الكود التالي لإعداد معلمات إدخال متعددة:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

قم بإنشاء مجموعة بيانات دفعة

قم بإنشاء مجموعة بيانات الدُفعة ، والتي تستخدمها لاحقًا في خطوة تحويل الدُفعة:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

تحميل البيانات إلى حاوية S3

قم بتحميل مجموعات البيانات إلى Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

تحديد برنامج نصي للمعالجة وخطوة المعالجة

في هذه الخطوة ، تقوم بإعداد نص برمجي Python للقيام بهندسة الميزات ، وترميز واحد ساخن ، وتنسيق التدريب والتحقق من الصحة واختبار الانقسامات لاستخدامها في بناء النموذج. قم بتشغيل التعليمات البرمجية التالية لإنشاء البرنامج النصي للمعالجة:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

بعد ذلك ، قم بتشغيل كتلة التعليمات البرمجية التالية لإنشاء مثيل للمعالج وخطوة خطوط الأنابيب لتشغيل البرنامج النصي للمعالجة. نظرًا لأن نص المعالجة مكتوبًا بلغة Pandas ، فإنك تستخدم ملف معالج SKLearnProcessor. خطوط الأنابيب ProcessingStep تأخذ الدالة الوسيطات التالية: المعالج ، ومواقع الإدخال S3 لمجموعات البيانات الأولية ، ومواقع الإخراج S3 لحفظ مجموعات البيانات المعالجة.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

حدد خطوة التدريب

قم بإعداد تدريب نموذجي باستخدام مقدر SageMaker XGBoost وخطوط الأنابيب TrainingStep وظيفة:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

حدد نص التقييم وخطوة تقييم النموذج

قم بتشغيل كتلة التعليمات البرمجية التالية لتقييم النموذج بمجرد تدريبه. يقوم هذا البرنامج النصي بتغليف المنطق للتحقق مما إذا كانت درجة AUC تفي بالحد المحدد.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

بعد ذلك ، قم بتشغيل كتلة التعليمات البرمجية التالية لإنشاء مثيل للمعالج وخطوة خطوط الأنابيب لتشغيل البرنامج النصي للتقييم. نظرًا لأن البرنامج النصي للتقييم يستخدم حزمة XGBoost ، فإنك تستخدم ملف ScriptProcessor مع صورة XGBoost. خطوط الأنابيب ProcessingStep تأخذ الدالة الوسيطات التالية: المعالج ، ومواقع الإدخال S3 لمجموعات البيانات الأولية ، ومواقع الإخراج S3 لحفظ مجموعات البيانات المعالجة.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

حدد خطوة إنشاء نموذج

قم بتشغيل كتلة التعليمات البرمجية التالية لإنشاء نموذج SageMaker باستخدام خطوة نموذج خطوط الأنابيب. تستخدم هذه الخطوة ناتج خطوة التدريب لحزم النموذج للنشر. لاحظ أنه يتم تمرير قيمة وسيطة نوع المثيل باستخدام معلمة خطوط الأنابيب التي حددتها مسبقًا في المنشور.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

حدد خطوة تحويل دفعة

قم بتشغيل كتلة التعليمات البرمجية التالية لتشغيل تحويل الدُفعات باستخدام النموذج المدرب مع إدخال الدُفعة الذي تم إنشاؤه في الخطوة الأولى:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

تحديد خطوة نموذج التسجيل

يسجل الكود التالي النموذج في سجل نموذج SageMaker باستخدام خطوة نموذج خطوط الأنابيب:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

حدد خطوة فاشلة لإيقاف خط الأنابيب

يحدد الكود التالي خطوة فشل خطوط الأنابيب لإيقاف تشغيل خط الأنابيب برسالة خطأ إذا كانت درجة AUC لا تفي بالحد المحدد:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

حدد خطوة شرط للتحقق من درجة الجامعة الأمريكية بالقاهرة

يحدد الكود التالي خطوة شرط للتحقق من درجة AUC وإنشاء نموذج مشروط وتشغيل تحويل دفعة وتسجيل نموذج في سجل النموذج ، أو إيقاف تشغيل خط الأنابيب في حالة فشل:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

بناء وتشغيل خط الأنابيب

بعد تحديد كل خطوات المكون ، يمكنك تجميعها في كائن خطوط الأنابيب. لا تحتاج إلى تحديد ترتيب خط الأنابيب لأن خطوط الأنابيب تستنتج تلقائيًا تسلسل الأمر بناءً على التبعيات بين الخطوات.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

قم بتشغيل التعليمات البرمجية التالية في خلية في دفتر ملاحظاتك. إذا كان خط الأنابيب موجودًا بالفعل ، يقوم الكود بتحديث خط الأنابيب. إذا لم يكن خط الأنابيب موجودًا ، فسيتم إنشاء خط جديد.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

وفي الختام

في هذا المنشور ، قدمنا ​​بعض الميزات الجديدة المتوفرة الآن مع خطوط الأنابيب جنبًا إلى جنب مع ميزات SageMaker المضمنة الأخرى وخوارزمية XGBoost لتطوير وتكرار ونشر نموذج للتنبؤ بالتخبط. يمكن توسيع الحل بمصادر بيانات إضافية

لتنفيذ سير عمل ML الخاص بك. لمزيد من التفاصيل حول الخطوات المتوفرة في سير عمل خطوط الأنابيب ، راجع خط أنابيب بناء نموذج Amazon SageMaker و سير عمل SageMaker. أمثلة على برنامج AWS SageMaker يحتوي GitHub repo على المزيد من الأمثلة حول حالات الاستخدام المختلفة باستخدام خطوط الأنابيب.


حول المؤلف

ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. البحث العمودي. عاي.جيري بينغ هو مهندس تطوير برمجيات مع AWS SageMaker. وهو يركز على بناء نظام MLOps الشامل من البداية إلى النهاية بدءًا من التدريب وحتى مراقبة النماذج في الإنتاج. كما أنه متحمس لتقديم مفهوم MLOps إلى جمهور أوسع.

ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. البحث العمودي. عاي.ديوين تشى هو مهندس تطوير برمجيات في AWS. تركز حاليًا على تطوير وتحسين خطوط أنابيب SageMaker. خارج العمل ، تستمتع بممارسة التشيلو.

ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. البحث العمودي. عاي.غاياتري غناكوتا هو مهندس تعلّم آلي كبير مع خدمات AWS الاحترافية. إنها شغوفة بتطوير ونشر وشرح حلول الذكاء الاصطناعي / تعلم الآلة عبر مختلف المجالات. قبل هذا المنصب ، قادت العديد من المبادرات كعالمة بيانات ومهندسة ML مع كبرى الشركات العالمية في مجال المالية والتجزئة. وهي حاصلة على درجة الماجستير في علوم الكمبيوتر المتخصصة في علوم البيانات من جامعة كولورادو ، بولدر.

ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. البحث العمودي. عاي.روبيندر جريوال هو مهندس حلول متخصص في الذكاء الإصطناعي / ML مع AWS. يركز حاليًا على خدمة النماذج و MLOps على SageMaker. قبل توليه هذا المنصب ، عمل كمهندس لتعلم الآلة ونماذج البناء والاستضافة. خارج العمل يستمتع بلعب التنس وركوب الدراجات على المسارات الجبلية.

ميزات جديدة لخطوط أنابيب Amazon SageMaker و Amazon SageMaker SDK PlatoBlockchain Data Intelligence. البحث العمودي. عاي.راي لي هو عالم بيانات كبير مع خدمات AWS الاحترافية. يركز تخصصه على بناء حلول AI / ML وتشغيلها للعملاء من مختلف الأحجام ، بدءًا من الشركات الناشئة إلى مؤسسات المؤسسات. خارج العمل ، يتمتع راي باللياقة البدنية والسفر.

الطابع الزمني:

اكثر من التعلم الآلي من AWS