خطوط أنابيب Amazon SageMaker يسمح لعلماء البيانات ومهندسي التعلم الآلي (ML) بأتمتة تدفقات سير العمل التدريبية ، مما يساعدك على إنشاء عملية قابلة للتكرار لتنظيم خطوات تطوير النموذج للتجربة السريعة وإعادة تدريب النموذج. يمكنك أتمتة سير عمل بناء النموذج بالكامل ، بما في ذلك إعداد البيانات وهندسة الميزات والتدريب على النموذج وضبط النموذج والتحقق من صحة النموذج وفهرسته في سجل النموذج. يمكنك تكوين خطوط الأنابيب للتشغيل تلقائيًا على فترات منتظمة أو عند تشغيل أحداث معينة ، أو يمكنك تشغيلها يدويًا حسب الحاجة.
في هذا المنشور ، نسلط الضوء على بعض التحسينات التي تم إدخالها على ملف الأمازون SageMaker SDK وإدخال ميزات جديدة لخطوط أنابيب Amazon SageMaker التي تسهل على ممارسي التعلم الآلي إنشاء نماذج تعلم الآلة وتدريبها.
تواصل Pipelines ابتكار تجربة المطورين الخاصة بها ، ومع هذه الإصدارات الأخيرة ، يمكنك الآن استخدام الخدمة بطريقة أكثر تخصيصًا:
- 2.99.0, 2.101.1, 2.102.0, 2.104.0 - تم تحديث الوثائق
PipelineVariable
استخدام المقدر والمعالج والموالف والمحول وفئات القاعدة النموذجية ونماذج Amazon ونماذج الإطار. ستكون هناك تغييرات إضافية قادمة مع الإصدارات الأحدث من SDK لدعم جميع الفئات الفرعية للمقدرات والمعالجات.
- 2.90.0 - توافر نموذج لإنشاء موارد نموذجية متكاملة ومهام التسجيل.
- 2.88.2 - توافر جلسة خط الأنابيب للتفاعل المُدار مع كيانات وموارد SageMaker.
- 2.88.2 - توافق الفئة الفرعية لـ خطوات عمل خط سير العمل حتى تتمكن من بناء تجريدات الوظائف وتكوين وتشغيل المعالجة ، والتدريب ، والتحويل ، وضبط الوظائف كما تفعل بدون خط أنابيب.
- 2.76.0 - توافر فشل الخطوة للإيقاف المشروط لخط أنابيب بحالة فشل.
في هذا المنشور ، نوجهك عبر سير عمل باستخدام عينة مجموعة بيانات مع التركيز على بناء النماذج ونشرها لتوضيح كيفية تنفيذ ميزات خطوط الأنابيب الجديدة. في النهاية ، يجب أن يكون لديك معلومات كافية لاستخدام هذه الميزات الجديدة بنجاح وتبسيط أعباء عمل ML الخاصة بك.
نظرة عامة على الميزات
تقدم خطوط الأنابيب الميزات الجديدة التالية:
- شرح متغير خط الأنابيب - تقبل معلمات طريقة معينة أنواعًا متعددة من المدخلات ، بما في ذلك
PipelineVariables
، وتمت إضافة وثائق إضافية لتوضيح مكان PipelineVariables
مدعومة في أحدث إصدار ثابت من وثائق SageMaker SDK والتوقيع الأولي للوظائف. على سبيل المثال ، في مقدر TensorFlow التالي ، يُظهر توقيع البادئ ذلك الآن model_dir
و image_uri
الدعم PipelineVariables
، في حين أن المعلمات الأخرى لا تفعل ذلك. لمزيد من المعلومات ، يرجى الرجوع إلى مقدر TensorFlow.
- قبل:
TensorFlow(
py_version=None,
framework_version=None,
model_dir=None,
image_uri=None,
distribution=None,
**kwargs,
)
- بعد:
TensorFlow(
py_version: Union[str, NoneType] = None,
framework_version: Union[str, NoneType] = None,
model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
distribution: Union[Dict[str, str], NoneType] = None,
compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
**kwargs,
)
- جلسة خط الأنابيب - جلسة خط الأنابيب هو مفهوم جديد تم تقديمه لتحقيق الوحدة عبر SageMaker SDK ويقدم تهيئة كسولة لموارد خط الأنابيب (يتم التقاط مكالمات التشغيل ولكن لا يتم تشغيلها حتى يتم إنشاء خط الأنابيب وتشغيله). ال
PipelineSession
السياق يرث SageMakerSession
وتنفذ طرقًا ملائمة لك للتفاعل مع كيانات وموارد SageMaker الأخرى ، مثل وظائف التدريب ونقاط النهاية ومجموعات بيانات الإدخال المخزنة في خدمة تخزين أمازون البسيطة (أمازون S3).
- توافق الفئة الفرعية مع خطوات مهام مسار العمل - يمكنك الآن إنشاء تجريدات للوظائف وتكوين وتشغيل المعالجة ، والتدريب ، والتحويل ، وضبط الوظائف كما تفعل بدون خط أنابيب.
- على سبيل المثال ، إنشاء خطوة معالجة باستخدام
SKLearnProcessor
مطلوب مسبقًا ما يلي:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
role=role,
)
step_process = ProcessingStep(
name="{pipeline-name}-process",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
- كما نرى في الكود السابق ،
ProcessingStep
يحتاج إلى القيام بنفس منطق المعالجة المسبقة مثل .run
، فقط بدون بدء استدعاء API لبدء المهمة. ولكن مع تمكين توافق الفئة الفرعية الآن من خلال خطوات مهام مسار العمل ، فإننا نعلن عن step_args
الحجة التي تأخذ منطق المعالجة المسبقة مع .run حتى تتمكن من بناء تجريد الوظيفة وتكوينه كما تستخدمه بدون خطوط الأنابيب. نمر أيضا في pipeline_session
، وهو PipelineSession
الكائن ، بدلاً من sagemaker_session
للتأكد من التقاط مكالمات التشغيل ولكن لا يتم استدعاؤها حتى يتم إنشاء خط الأنابيب وتشغيله. انظر الكود التالي:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
role=role,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)
- خطوة النموذج (نهج مبسط مع خطوات إنشاء النموذج والتسجيل) - تقدم خطوط الأنابيب نوعين من الخطوات للتكامل مع نماذج SageMaker:
CreateModelStep
و RegisterModel
. يمكنك الآن تحقيق كليهما باستخدام ملف ModelStep
يكتب. لاحظ أن أ PipelineSession
مطلوب لتحقيق ذلك. هذا يجلب التشابه بين خطوات خط الأنابيب و SDK.
- قبل:
step_register = RegisterModel(
name="ChurnRegisterModel",
estimator=xgb_custom_estimator,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.large"],
transform_instances=["ml.m5.large"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
-
- بعد:
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
- خطوة الفشل (التوقف الشرطي لتشغيل خط الأنابيب) -
FailStep
يسمح بإيقاف خط الأنابيب بحالة فشل إذا تم استيفاء شرط ، مثل إذا كانت درجة النموذج أقل من عتبة معينة.
حل نظرة عامة
في هذا الحل ، نقطة الدخول الخاصة بك هي أمازون ساجميكر ستوديو بيئة التطوير المتكاملة (IDE) للتجارب السريعة. يوفر Studio بيئة لإدارة تجربة خطوط الأنابيب الشاملة. باستخدام الاستوديو ، يمكنك تجاوز ملف وحدة تحكم إدارة AWS لإدارة سير العمل بالكامل. لمزيد من المعلومات حول إدارة خطوط الأنابيب من داخل Studio ، يرجى الرجوع إلى عرض خطوط أنابيب SageMaker وتتبعها وتنفيذها في SageMaker Studio.
يوضح الرسم التخطيطي التالي البنية عالية المستوى لسير عمل ML مع الخطوات المختلفة لتدريب وإنشاء الاستنتاجات باستخدام الميزات الجديدة.
يتضمن خط الأنابيب الخطوات التالية:
- بيانات المعالجة المسبقة لإنشاء الميزات المطلوبة وتقسيم البيانات إلى مجموعات بيانات تدريب وتحقق من الصحة واختبار.
- قم بإنشاء وظيفة تدريبية باستخدام إطار عمل SageMaker XGBoost.
- قم بتقييم النموذج المدرب باستخدام مجموعة بيانات الاختبار.
- تحقق مما إذا كانت درجة AUC أعلى من الحد المحدد مسبقًا.
- إذا كانت درجة AUC أقل من الحد الأدنى ، فقم بإيقاف تشغيل خط الأنابيب وحددها على أنها فاشلة.
- إذا كانت درجة AUC أكبر من الحد الأدنى ، فقم بإنشاء نموذج SageMaker وقم بتسجيله في سجل نموذج SageMaker.
- قم بتطبيق تحويل الدُفعات على مجموعة البيانات المحددة باستخدام النموذج الذي تم إنشاؤه في الخطوة السابقة.
المتطلبات الأساسية المسبقة
لمتابعة هذا المنشور ، تحتاج إلى حساب AWS بامتداد مجال الاستوديو.
يتم دمج خطوط الأنابيب مباشرةً مع كيانات وموارد SageMaker ، لذلك لا تحتاج إلى التفاعل مع أي خدمات AWS أخرى. لا تحتاج أيضًا إلى إدارة أي موارد لأنها خدمة مُدارة بالكامل ، مما يعني أنها تنشئ الموارد وتديرها لك. لمزيد من المعلومات حول مكونات SageMaker المختلفة التي تعد كلا من واجهات برمجة تطبيقات Python المستقلة جنبًا إلى جنب مع المكونات المدمجة في Studio ، راجع صفحة منتج SageMaker.
قبل البدء ، قم بتثبيت إصدار SageMaker SDK> = 2.104.0 و xlrd> = 1.0.0 داخل دفتر Studio باستخدام مقتطف الشفرة التالي:
print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
import sagemaker
سير عمل ML
في هذا المنشور ، تستخدم المكونات التالية:
- إعداد البيانات
- معالجة SageMaker - SageMaker Processing هي خدمة مُدارة بالكامل تتيح لك تشغيل تحويلات البيانات المخصصة وهندسة الميزات لأعباء عمل ML.
- بناء نموذج
- التدريب النموذجي والتقييم
- تدريب بنقرة واحدة - ميزة التدريب الموزع SageMaker. يوفر SageMaker مكتبات تدريب موزعة لتوازي البيانات ونموذج التوازي. تم تحسين المكتبات لبيئة تدريب SageMaker ، وتساعد في تكييف وظائف التدريب الموزعة الخاصة بك مع SageMaker ، وتحسين سرعة التدريب والإنتاجية.
- تجارب SageMaker - التجارب هي قدرة SageMaker التي تتيح لك تنظيم وتتبع ومقارنة وتقييم تكرارات ML الخاصة بك.
- تحويل دفعة SageMaker - تحويل الدُفعات أو تسجيل النقاط دون اتصال بالإنترنت هي خدمة مُدارة في SageMaker تتيح لك التنبؤ على مجموعة بيانات أكبر باستخدام نماذج ML الخاصة بك.
- تنسيق سير العمل
خط أنابيب SageMaker عبارة عن سلسلة من الخطوات المترابطة المحددة بواسطة تعريف خط أنابيب JSON. يشفر خط أنابيب باستخدام رسم بياني لا دوري موجه (DAG). يعطي DAG معلومات عن المتطلبات والعلاقات بين كل خطوة من خط الأنابيب ، ويتم تحديد هيكلها من خلال تبعيات البيانات بين الخطوات. يتم إنشاء هذه التبعيات عندما يتم تمرير خصائص مخرجات الخطوة كمدخل إلى خطوة أخرى.
يوضح الرسم البياني التالي الخطوات المختلفة في خط أنابيب SageMaker (لحالة استخدام التنبؤ بالتخبط) حيث يتم استنتاج الروابط بين الخطوات بواسطة SageMaker استنادًا إلى المدخلات والمخرجات المحددة بواسطة تعريفات الخطوة.
تتناول الأقسام التالية إنشاء كل خطوة من خط الأنابيب وتشغيل خط الأنابيب بالكامل بمجرد إنشائه.
هيكل المشروع
لنبدأ بهيكل المشروع:
- / sm-pipelines-end-to-end-example - اسم المشروع
- /البيانات - مجموعات البيانات
- / خطوط الأنابيب - ملفات التعليمات البرمجية لمكونات خطوط الأنابيب
- sagemaker- خطوط الأنابيب- project.ipynb - جهاز كمبيوتر محمول يتجول في سير عمل النمذجة باستخدام ميزات خطوط الأنابيب الجديدة
قم بتنزيل مجموعة البيانات
لمتابعة هذا المنشور ، تحتاج إلى تنزيل وحفظ ملف عينة مجموعة البيانات ضمن مجلد البيانات داخل الدليل الرئيسي للمشروع ، والذي يحفظ الملف بتنسيق نظام ملفات أمازون المرن (Amazon EFS) داخل بيئة الاستوديو.
بناء مكونات خط الأنابيب
أنت الآن جاهز لبناء مكونات خط الأنابيب.
استيراد البيانات وتوضيح المعلمات والثوابت
قم بإنشاء دفتر ملاحظات Studio يسمى sagemaker-pipelines-project.ipynb
داخل الدليل الرئيسي للمشروع. أدخل كتلة التعليمات البرمجية التالية في خلية ، وقم بتشغيل الخلية لإعداد كائنات عميل SageMaker و S3 ، وإنشاء PipelineSession
، وقم بإعداد موقع حاوية S3 باستخدام الحاوية الافتراضية التي تأتي مع جلسة SageMaker:
import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"
تدعم خطوط الأنابيب المعلمات ، والتي تسمح لك بتحديد معلمات الإدخال في وقت التشغيل دون تغيير رمز خط الأنابيب الخاص بك. يمكنك استخدام الوحدات المتاحة تحت sagemaker.workflow.parameters
وحدة ، مثل ParameterInteger
, ParameterFloat
و ParameterString
، لتحديد معلمات خط الأنابيب لأنواع البيانات المختلفة. قم بتشغيل الكود التالي لإعداد معلمات إدخال متعددة:
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge"
)
input_data = ParameterString(
name="InputData",
default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
قم بإنشاء مجموعة بيانات دفعة
قم بإنشاء مجموعة بيانات الدُفعة ، والتي تستخدمها لاحقًا في خطوة تحويل الدُفعة:
def preprocess_batch_data(file_path):
df = pd.read_csv(file_path)
## Convert to datetime columns
df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
## Drop Rows with null values
df = df.dropna()
## Create Column which gives the days between the last order and the first order
df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
## Create Column which gives the days between when the customer record was created and the first order
df['created'] = pd.to_datetime(df['created'])
df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
## Drop Columns
df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
## Apply one hot encoding on favday and city columns
df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
return df
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)
تحميل البيانات إلى حاوية S3
قم بتحميل مجموعات البيانات إلى Amazon S3:
s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")
تحديد برنامج نصي للمعالجة وخطوة المعالجة
في هذه الخطوة ، تقوم بإعداد نص برمجي Python للقيام بهندسة الميزات ، وترميز واحد ساخن ، وتنسيق التدريب والتحقق من الصحة واختبار الانقسامات لاستخدامها في بناء النموذج. قم بتشغيل التعليمات البرمجية التالية لإنشاء البرنامج النصي للمعالجة:
%%writefile pipelines/customerchurn/preprocess.py
import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
base_dir = "/opt/ml/processing"
#Read Data
df = pd.read_csv(
f"{base_dir}/input/storedata_total.csv"
)
# convert created column to datetime
df["created"] = pd.to_datetime(df["created"])
#Convert firstorder and lastorder to datetime datatype
df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
#Drop Rows with Null Values
df = df.dropna()
#Create column which gives the days between the last order and the first order
df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
#Create column which gives the days between the customer record was created and the first order
df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
#Drop columns
df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
#Apply one hot encoding on favday and city columns
df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
# Split into train, validation and test datasets
y = df.pop("retained")
X_pre = df
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
np.random.shuffle(X)
# Split in Train, Test and Validation Datasets
train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
train_rows = np.shape(train)[0]
validation_rows = np.shape(validation)[0]
test_rows = np.shape(test)[0]
train = pd.DataFrame(train)
test = pd.DataFrame(test)
validation = pd.DataFrame(validation)
# Convert the label column to integer
train[0] = train[0].astype(int)
test[0] = test[0].astype(int)
validation[0] = validation[0].astype(int)
# Save the Dataframes as csv files
train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
بعد ذلك ، قم بتشغيل كتلة التعليمات البرمجية التالية لإنشاء مثيل للمعالج وخطوة خطوط الأنابيب لتشغيل البرنامج النصي للمعالجة. نظرًا لأن نص المعالجة مكتوبًا بلغة Pandas ، فإنك تستخدم ملف معالج SKLearnProcessor. خطوط الأنابيب ProcessingStep
تأخذ الدالة الوسيطات التالية: المعالج ، ومواقع الإدخال S3 لمجموعات البيانات الأولية ، ومواقع الإخراج S3 لحفظ مجموعات البيانات المعالجة.
# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")
# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name="sklearn-churn-process",
role=role,
sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
destination=f"s3://{default_bucket}/output/train" ),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
destination=f"s3://{default_bucket}/output/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
destination=f"s3://{default_bucket}/output/test")
],
code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)
حدد خطوة التدريب
قم بإعداد تدريب نموذجي باستخدام مقدر SageMaker XGBoost وخطوط الأنابيب TrainingStep
وظيفة:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path=model_path,
role=role,
sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
)
train_args = xgb_train.fit(
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
},
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
name="ChurnModelTrain",
step_args=train_args,
)
حدد نص التقييم وخطوة تقييم النموذج
قم بتشغيل كتلة التعليمات البرمجية التالية لتقييم النموذج بمجرد تدريبه. يقوم هذا البرنامج النصي بتغليف المنطق للتحقق مما إذا كانت درجة AUC تفي بالحد المحدد.
%%writefile pipelines/customerchurn/evaluate.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":
#Read Model Tar File
model_path = f"/opt/ml/processing/model/model.tar.gz"
with tarfile.open(model_path) as tar:
tar.extractall(path=".")
model = pickle.load(open("xgboost-model", "rb"))
#Read Test Data using which we evaluate the model
test_path = "/opt/ml/processing/test/test.csv"
df = pd.read_csv(test_path, header=None)
y_test = df.iloc[:, 0].to_numpy()
df.drop(df.columns[0], axis=1, inplace=True)
X_test = xgboost.DMatrix(df.values)
#Run Predictions
predictions = model.predict(X_test)
#Evaluate Predictions
fpr, tpr, thresholds = roc_curve(y_test, predictions)
auc_score = auc(fpr, tpr)
report_dict = {
"classification_metrics": {
"auc_score": {
"value": auc_score,
},
},
}
#Save Evaluation Report
output_dir = "/opt/ml/processing/evaluation"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
evaluation_path = f"{output_dir}/evaluation.json"
with open(evaluation_path, "w") as f:
f.write(json.dumps(report_dict))
بعد ذلك ، قم بتشغيل كتلة التعليمات البرمجية التالية لإنشاء مثيل للمعالج وخطوة خطوط الأنابيب لتشغيل البرنامج النصي للتقييم. نظرًا لأن البرنامج النصي للتقييم يستخدم حزمة XGBoost ، فإنك تستخدم ملف ScriptProcessor
مع صورة XGBoost. خطوط الأنابيب ProcessingStep
تأخذ الدالة الوسيطات التالية: المعالج ، ومواقع الإدخال S3 لمجموعات البيانات الأولية ، ومواقع الإخراج S3 لحفظ مجموعات البيانات المعالجة.
#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=processing_instance_type,
instance_count=1,
base_job_name="script-churn-eval",
role=role,
sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
inputs=[
ProcessingInput(
source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model",
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
destination=f"s3://{default_bucket}/output/evaluation"),
],
code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
name="ChurnEvalModel",
step_args=eval_args,
property_files=[evaluation_report],
)
حدد خطوة إنشاء نموذج
قم بتشغيل كتلة التعليمات البرمجية التالية لإنشاء نموذج SageMaker باستخدام خطوة نموذج خطوط الأنابيب. تستخدم هذه الخطوة ناتج خطوة التدريب لحزم النموذج للنشر. لاحظ أنه يتم تمرير قيمة وسيطة نوع المثيل باستخدام معلمة خطوط الأنابيب التي حددتها مسبقًا في المنشور.
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
step_create_model = ModelStep(
name="ChurnCreateModel",
step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)
حدد خطوة تحويل دفعة
قم بتشغيل كتلة التعليمات البرمجية التالية لتشغيل تحويل الدُفعات باستخدام النموذج المدرب مع إدخال الدُفعة الذي تم إنشاؤه في الخطوة الأولى:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
transformer = Transformer(
model_name=step_create_model.properties.ModelName,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=f"s3://{default_bucket}/ChurnTransform",
sagemaker_session=pipeline_session
)
step_transform = TransformStep(
name="ChurnTransform",
step_args=transformer.transform(
data=batch_data,
content_type="text/csv"
)
)
تحديد خطوة نموذج التسجيل
يسجل الكود التالي النموذج في سجل نموذج SageMaker باستخدام خطوة نموذج خطوط الأنابيب:
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri="{}/evaluation.json".format(
step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
),
content_type="application/json",
)
)
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
حدد خطوة فاشلة لإيقاف خط الأنابيب
يحدد الكود التالي خطوة فشل خطوط الأنابيب لإيقاف تشغيل خط الأنابيب برسالة خطأ إذا كانت درجة AUC لا تفي بالحد المحدد:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
name="ChurnAUCScoreFail",
error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
)
حدد خطوة شرط للتحقق من درجة الجامعة الأمريكية بالقاهرة
يحدد الكود التالي خطوة شرط للتحقق من درجة AUC وإنشاء نموذج مشروط وتشغيل تحويل دفعة وتسجيل نموذج في سجل النموذج ، أو إيقاف تشغيل خط الأنابيب في حالة فشل:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
left=JsonGet(
step_name=step_eval.name,
property_file=evaluation_report,
json_path="classification_metrics.auc_score.value",
),
right=auc_score_threshold,
)
step_cond = ConditionStep(
name="CheckAUCScoreChurnEvaluation",
conditions=[cond_lte],
if_steps=[step_register, step_create_model, step_transform],
else_steps=[step_fail],
)
بناء وتشغيل خط الأنابيب
بعد تحديد كل خطوات المكون ، يمكنك تجميعها في كائن خطوط الأنابيب. لا تحتاج إلى تحديد ترتيب خط الأنابيب لأن خطوط الأنابيب تستنتج تلقائيًا تسلسل الأمر بناءً على التبعيات بين الخطوات.
import json
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_count,
processing_instance_type,
training_instance_type,
model_approval_status,
input_data,
batch_data,
auc_score_threshold,
],
steps=[step_process, step_train, step_eval, step_cond],
)
definition = json.loads(pipeline.definition())
print(definition)
قم بتشغيل التعليمات البرمجية التالية في خلية في دفتر ملاحظاتك. إذا كان خط الأنابيب موجودًا بالفعل ، يقوم الكود بتحديث خط الأنابيب. إذا لم يكن خط الأنابيب موجودًا ، فسيتم إنشاء خط جديد.
pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution
وفي الختام
في هذا المنشور ، قدمنا بعض الميزات الجديدة المتوفرة الآن مع خطوط الأنابيب جنبًا إلى جنب مع ميزات SageMaker المضمنة الأخرى وخوارزمية XGBoost لتطوير وتكرار ونشر نموذج للتنبؤ بالتخبط. يمكن توسيع الحل بمصادر بيانات إضافية
لتنفيذ سير عمل ML الخاص بك. لمزيد من التفاصيل حول الخطوات المتوفرة في سير عمل خطوط الأنابيب ، راجع خط أنابيب بناء نموذج Amazon SageMaker و سير عمل SageMaker. أمثلة على برنامج AWS SageMaker يحتوي GitHub repo على المزيد من الأمثلة حول حالات الاستخدام المختلفة باستخدام خطوط الأنابيب.
حول المؤلف
جيري بينغ هو مهندس تطوير برمجيات مع AWS SageMaker. وهو يركز على بناء نظام MLOps الشامل من البداية إلى النهاية بدءًا من التدريب وحتى مراقبة النماذج في الإنتاج. كما أنه متحمس لتقديم مفهوم MLOps إلى جمهور أوسع.
ديوين تشى هو مهندس تطوير برمجيات في AWS. تركز حاليًا على تطوير وتحسين خطوط أنابيب SageMaker. خارج العمل ، تستمتع بممارسة التشيلو.
غاياتري غناكوتا هو مهندس تعلّم آلي كبير مع خدمات AWS الاحترافية. إنها شغوفة بتطوير ونشر وشرح حلول الذكاء الاصطناعي / تعلم الآلة عبر مختلف المجالات. قبل هذا المنصب ، قادت العديد من المبادرات كعالمة بيانات ومهندسة ML مع كبرى الشركات العالمية في مجال المالية والتجزئة. وهي حاصلة على درجة الماجستير في علوم الكمبيوتر المتخصصة في علوم البيانات من جامعة كولورادو ، بولدر.
روبيندر جريوال هو مهندس حلول متخصص في الذكاء الإصطناعي / ML مع AWS. يركز حاليًا على خدمة النماذج و MLOps على SageMaker. قبل توليه هذا المنصب ، عمل كمهندس لتعلم الآلة ونماذج البناء والاستضافة. خارج العمل يستمتع بلعب التنس وركوب الدراجات على المسارات الجبلية.
راي لي هو عالم بيانات كبير مع خدمات AWS الاحترافية. يركز تخصصه على بناء حلول AI / ML وتشغيلها للعملاء من مختلف الأحجام ، بدءًا من الشركات الناشئة إلى مؤسسات المؤسسات. خارج العمل ، يتمتع راي باللياقة البدنية والسفر.