ท่อส่ง Amazon SageMaker อนุญาตให้นักวิทยาศาสตร์ข้อมูลและวิศวกรแมชชีนเลิร์นนิง (ML) กำหนดเวิร์กโฟลว์การฝึกอบรมโดยอัตโนมัติ ซึ่งช่วยให้คุณสร้างกระบวนการที่ทำซ้ำได้เพื่อควบคุมขั้นตอนการพัฒนาแบบจำลองสำหรับการทดลองอย่างรวดเร็วและการฝึกจำลองใหม่ คุณสามารถทำให้เวิร์กโฟลว์การสร้างแบบจำลองทั้งหมดเป็นแบบอัตโนมัติ รวมถึงการจัดเตรียมข้อมูล วิศวกรรมคุณลักษณะ การฝึกอบรมแบบจำลอง การปรับแต่งแบบจำลอง และการตรวจสอบแบบจำลอง และแค็ตตาล็อกในรีจีสทรีแบบจำลอง คุณสามารถกำหนดค่าไปป์ไลน์ให้ทำงานโดยอัตโนมัติตามช่วงเวลาปกติหรือเมื่อมีการทริกเกอร์เหตุการณ์บางอย่าง หรือคุณสามารถเรียกใช้ด้วยตนเองได้ตามต้องการ
ในโพสต์นี้ เราเน้นย้ำถึงการปรับปรุงบางอย่างของ อเมซอน SageMaker SDK และแนะนำคุณสมบัติใหม่ของ Amazon SageMaker Pipelines ที่ช่วยให้ผู้ปฏิบัติงาน ML สร้างและฝึกอบรมโมเดล ML ได้ง่ายขึ้น
Pipelines ยังคงสร้างสรรค์ประสบการณ์ของนักพัฒนาอย่างต่อเนื่อง และด้วยการเปิดตัวล่าสุดเหล่านี้ ตอนนี้คุณสามารถใช้บริการในรูปแบบที่กำหนดเองมากขึ้น:
- 2.99.0, 2.101.1, 2.102.0, 2.104.0 – อัปเดตเอกสารเกี่ยวกับ
PipelineVariable
การใช้งานสำหรับคลาสพื้นฐานของตัวประมาณ โปรเซสเซอร์ จูนเนอร์ หม้อแปลงและโมเดล โมเดล Amazon และโมเดลเฟรมเวิร์ก จะมีการเปลี่ยนแปลงเพิ่มเติมที่มาพร้อมกับ SDK เวอร์ชันใหม่กว่าเพื่อรองรับคลาสย่อยทั้งหมดของตัวประมาณและตัวประมวลผล
- 2.90.0 – ความพร้อมใช้งานของ รุ่นStep สำหรับงานสร้างทรัพยากรแบบจำลองและการลงทะเบียนแบบบูรณาการ
- 2.88.2 – ความพร้อมใช้งานของ เซสชันไปป์ไลน์ สำหรับการโต้ตอบที่มีการจัดการกับเอนทิตีและทรัพยากรของ SageMaker
- 2.88.2 – ความเข้ากันได้ของคลาสย่อยสำหรับ ขั้นตอนงานไปป์ไลน์เวิร์กโฟลว์ เพื่อให้คุณสามารถสร้างงานนามธรรมและกำหนดค่าและเรียกใช้การประมวลผล การฝึกอบรม การแปลง และการปรับแต่งงานได้เหมือนที่คุณทำโดยไม่มีไปป์ไลน์
- 2.76.0 – ความพร้อมใช้งานของ ขั้นตอนล้มเหลว เพื่อหยุดไปป์ไลน์ที่มีสถานะล้มเหลวตามเงื่อนไข
ในโพสต์นี้ เราจะแนะนำคุณเกี่ยวกับเวิร์กโฟลว์โดยใช้ชุดข้อมูลตัวอย่างโดยเน้นที่การสร้างแบบจำลองและการปรับใช้เพื่อสาธิตวิธีการใช้คุณลักษณะใหม่ของไปป์ไลน์ ในตอนท้าย คุณควรมีข้อมูลเพียงพอที่จะใช้คุณลักษณะใหม่เหล่านี้ได้สำเร็จและทำให้ปริมาณงาน ML ของคุณง่ายขึ้น
ภาพรวมคุณสมบัติ
Pipelines นำเสนอคุณสมบัติใหม่ดังต่อไปนี้:
- คำอธิบายประกอบตัวแปรไปป์ไลน์ – พารามิเตอร์เมธอดบางตัวยอมรับอินพุตหลายประเภท รวมถึง
PipelineVariables
และมีการเพิ่มเอกสารเพิ่มเติมเพื่อชี้แจงว่า PipelineVariables
ได้รับการสนับสนุนทั้งในเอกสาร SageMaker SDK เวอร์ชันเสถียรล่าสุดและลายเซ็นเริ่มต้นของฟังก์ชัน ตัวอย่างเช่น ในตัวประมาณค่า TensorFlow ต่อไปนี้ ลายเซ็นเริ่มต้นจะแสดงว่า model_dir
และ image_uri
สนับสนุน PipelineVariables
ในขณะที่พารามิเตอร์อื่นไม่ทำ สำหรับข้อมูลเพิ่มเติม โปรดดูที่ เครื่องมือประมาณการเทนเซอร์โฟลว์.
- ก่อน:
TensorFlow(
py_version=None,
framework_version=None,
model_dir=None,
image_uri=None,
distribution=None,
**kwargs,
)
- หลังจากที่:
TensorFlow(
py_version: Union[str, NoneType] = None,
framework_version: Union[str, NoneType] = None,
model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
distribution: Union[Dict[str, str], NoneType] = None,
compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
**kwargs,
)
- เซสชั่นไปป์ไลน์ - เซสชันไปป์ไลน์ เป็นแนวคิดใหม่ที่นำมาใช้เพื่อนำความสามัคคีใน SageMaker SDK และแนะนำการเริ่มต้นใช้งานทรัพยากรไปป์ไลน์แบบขี้เกียจ ดิ
PipelineSession
บริบทสืบทอด SageMakerSession
และใช้วิธีที่สะดวกสำหรับคุณในการโต้ตอบกับเอนทิตีและทรัพยากร SageMaker อื่นๆ เช่น งานฝึกอบรม จุดปลาย และชุดข้อมูลอินพุตที่จัดเก็บไว้ใน บริการจัดเก็บข้อมูลอย่างง่ายของ Amazon (อเมซอน เอส3).
- ความเข้ากันได้ของคลาสย่อยกับขั้นตอนงานไปป์ไลน์เวิร์กโฟลว์ – ตอนนี้คุณสามารถสร้างนามธรรมของงานและกำหนดค่าและเรียกใช้การประมวลผล การฝึกอบรม การแปลง และการปรับแต่งงานได้ตามที่คุณต้องการโดยไม่ต้องใช้ไปป์ไลน์
- ตัวอย่างเช่น การสร้างขั้นตอนการประมวลผลด้วย
SKLearnProcessor
ก่อนหน้านี้ต้องการสิ่งต่อไปนี้:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
role=role,
)
step_process = ProcessingStep(
name="{pipeline-name}-process",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
- ดังที่เราเห็นในรหัสก่อนหน้า
ProcessingStep
ต้องทำโดยพื้นฐานตรรกะการประมวลผลล่วงหน้าเช่นเดียวกับ .run
โดยไม่ต้องเริ่มต้นการเรียก API เพื่อเริ่มงาน แต่ด้วยความเข้ากันได้ของคลาสย่อยในขณะนี้ที่เปิดใช้งานด้วยขั้นตอนงานไปป์ไลน์เวิร์กโฟลว์ เราประกาศ step_args
อาร์กิวเมนต์ที่ใช้ตรรกะก่อนการประมวลผลด้วย .run เพื่อให้คุณสามารถสร้างสิ่งที่เป็นนามธรรมของงานและกำหนดค่าตามที่คุณต้องการใช้โดยไม่มีไปป์ไลน์ นอกจากนี้เรายังผ่านใน pipeline_session
ซึ่งเป็น PipelineSession
วัตถุ แทน sagemaker_session
เพื่อให้แน่ใจว่ามีการเรียกใช้การโทรแต่จะไม่ถูกเรียกจนกว่าจะมีการสร้างและเรียกใช้ไปป์ไลน์ ดูรหัสต่อไปนี้:
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
role=role,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code=f"code/preprocess.py",
)
step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)
- ขั้นตอนแบบจำลอง (แนวทางที่คล่องตัวด้วยขั้นตอนการสร้างแบบจำลองและการลงทะเบียน) –Pipelines มีขั้นตอนสองประเภทเพื่อรวมเข้ากับรุ่น SageMaker:
CreateModelStep
และ RegisterModel
. ตอนนี้คุณสามารถบรรลุทั้งสองได้โดยใช้เพียง ModelStep
พิมพ์. สังเกตว่า a PipelineSession
จำเป็นเพื่อให้บรรลุสิ่งนี้ สิ่งนี้ทำให้เกิดความคล้ายคลึงกันระหว่างขั้นตอนไปป์ไลน์และ SDK
- ก่อน:
step_register = RegisterModel(
name="ChurnRegisterModel",
estimator=xgb_custom_estimator,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.large"],
transform_instances=["ml.m5.large"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
-
- หลังจากที่:
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
- ขั้นตอนที่ล้มเหลว (หยุดตามเงื่อนไขของการรันไปป์ไลน์) -
FailStep
อนุญาตให้ไปป์ไลน์หยุดทำงานโดยมีสถานะล้มเหลวหากตรงตามเงื่อนไข เช่น หากคะแนนโมเดลต่ำกว่าเกณฑ์ที่กำหนด
ภาพรวมโซลูชัน
ในโซลูชันนี้ จุดเริ่มต้นของคุณคือ สตูดิโอ Amazon SageMaker สภาพแวดล้อมการพัฒนาแบบบูรณาการ (IDE) สำหรับการทดลองอย่างรวดเร็ว Studio เสนอสภาพแวดล้อมในการจัดการประสบการณ์ไปป์ไลน์แบบ end-to-end ด้วย Studio คุณสามารถเลี่ยง คอนโซลการจัดการ AWS สำหรับการจัดการเวิร์กโฟลว์ทั้งหมดของคุณ สำหรับข้อมูลเพิ่มเติมเกี่ยวกับการจัดการไปป์ไลน์จากภายใน Studio โปรดดูที่ ดู ติดตาม และดำเนินการไปป์ไลน์ SageMaker ใน SageMaker Studio.
ไดอะแกรมต่อไปนี้แสดงสถาปัตยกรรมระดับสูงของเวิร์กโฟลว์ ML พร้อมขั้นตอนต่างๆ ในการฝึกและสร้างการอนุมานโดยใช้คุณลักษณะใหม่
ไปป์ไลน์มีขั้นตอนต่อไปนี้:
- ประมวลผลข้อมูลล่วงหน้าเพื่อสร้างคุณสมบัติที่จำเป็นและแยกข้อมูลออกเป็นชุดข้อมูลฝึก การตรวจสอบ และทดสอบ
- สร้างงานการฝึกอบรมด้วยเฟรมเวิร์ก SageMaker XGBoost
- ประเมินแบบจำลองที่ได้รับการฝึกอบรมโดยใช้ชุดข้อมูลทดสอบ
- ตรวจสอบว่าคะแนน AUC สูงกว่าเกณฑ์ที่กำหนดไว้ล่วงหน้าหรือไม่
- หากคะแนน AUC น้อยกว่าเกณฑ์ ให้หยุดการทำงานของไปป์ไลน์และทำเครื่องหมายว่าล้มเหลว
- หากคะแนน AUC มากกว่าเกณฑ์ ให้สร้างโมเดล SageMaker และลงทะเบียนในรีจีสทรีโมเดล SageMaker
- ใช้การแปลงเป็นชุดในชุดข้อมูลที่กำหนดโดยใช้แบบจำลองที่สร้างในขั้นตอนก่อนหน้า
เบื้องต้น
เพื่อติดตามโพสต์นี้ คุณต้องมีบัญชี AWS ที่มี a โดเมนสตูดิโอ.
Pipelines ถูกรวมเข้ากับเอนทิตีและทรัพยากรของ SageMaker โดยตรง ดังนั้นคุณไม่จำเป็นต้องโต้ตอบกับบริการอื่นๆ ของ AWS คุณไม่จำเป็นต้องจัดการทรัพยากรใดๆ เนื่องจากเป็นบริการที่มีการจัดการเต็มรูปแบบ ซึ่งหมายความว่าจะสร้างและจัดการทรัพยากรให้กับคุณ สำหรับข้อมูลเพิ่มเติมเกี่ยวกับส่วนประกอบ SageMaker ต่างๆ ที่เป็นทั้ง Python API แบบสแตนด์อโลนพร้อมกับส่วนประกอบที่ผสานรวมของ Studio ให้ดูที่ หน้าผลิตภัณฑ์ SageMaker.
ก่อนเริ่มต้น ให้ติดตั้ง SageMaker SDK เวอร์ชัน >= 2.104.0 และ xlrd >=1.0.0 ภายในโน้ตบุ๊ก Studio โดยใช้ข้อมูลโค้ดต่อไปนี้:
print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
import sagemaker
ML เวิร์กโฟลว์
สำหรับโพสต์นี้ คุณใช้ส่วนประกอบต่อไปนี้:
- การเตรียมข้อมูล
- การประมวลผล SageMaker – การประมวลผล SageMaker เป็นบริการที่มีการจัดการเต็มรูปแบบ ซึ่งช่วยให้คุณสามารถเรียกใช้การแปลงข้อมูลแบบกำหนดเองและวิศวกรรมคุณลักษณะสำหรับปริมาณงาน ML
- การสร้างแบบจำลอง
- การฝึกแบบจำลองและการประเมิน
- การฝึกอบรมในคลิกเดียว – SageMaker แจกจ่ายคุณสมบัติการฝึกอบรม SageMaker จัดเตรียมไลบรารีการฝึกอบรมแบบกระจายสำหรับการขนานข้อมูลและความเท่าเทียมของแบบจำลอง ไลบรารีได้รับการปรับให้เหมาะสมสำหรับสภาพแวดล้อมการฝึกอบรม SageMaker ช่วยปรับงานการฝึกอบรมแบบกระจายไปยัง SageMaker และปรับปรุงความเร็วในการฝึกอบรมและปริมาณงาน
- การทดลอง SageMaker – Experiments เป็นความสามารถของ SageMaker ที่ให้คุณจัดระเบียบ ติดตาม เปรียบเทียบ และประเมินการวนซ้ำ ML ของคุณ
- การแปลงแบทช์ SageMaker – การแปลงแบทช์หรือการให้คะแนนแบบออฟไลน์เป็นบริการที่มีการจัดการใน SageMaker ที่ให้คุณคาดการณ์ชุดข้อมูลขนาดใหญ่ขึ้นโดยใช้โมเดล ML ของคุณ
- การประสานเวิร์กโฟลว์
ไปป์ไลน์ SageMaker คือชุดของขั้นตอนที่เชื่อมต่อถึงกันซึ่งกำหนดโดยคำจำกัดความไปป์ไลน์ JSON มันเข้ารหัสไปป์ไลน์โดยใช้กราฟ acyclic กำกับ (DAG) DAG ให้ข้อมูลเกี่ยวกับข้อกำหนดและความสัมพันธ์ระหว่างแต่ละขั้นตอนของไปป์ไลน์ และโครงสร้างถูกกำหนดโดยการอ้างอิงข้อมูลระหว่างขั้นตอนต่างๆ การขึ้นต่อกันเหล่านี้ถูกสร้างขึ้นเมื่อคุณสมบัติของเอาต์พุตของขั้นตอนถูกส่งผ่านเป็นอินพุตไปยังอีกขั้นตอนหนึ่ง
ไดอะแกรมต่อไปนี้แสดงขั้นตอนต่างๆ ในไปป์ไลน์ SageMaker (สำหรับกรณีการใช้งานการคาดการณ์แบบปั่นป่วน) ซึ่ง SageMaker อนุมานการเชื่อมต่อระหว่างขั้นตอนต่างๆ ตามอินพุตและเอาต์พุตที่กำหนดโดยข้อกำหนดขั้นตอน
ส่วนถัดไปจะอธิบายเกี่ยวกับการสร้างแต่ละขั้นตอนของไปป์ไลน์และเรียกใช้ไปป์ไลน์ทั้งหมดเมื่อสร้างเสร็จ
โครงสร้างโครงการ
เริ่มจากโครงสร้างโครงการกันก่อน:
- / sm-pipelines-end-to-end-example – ชื่อโครงการ
- /ข้อมูล – ชุดข้อมูล
- /ท่อส่ง – ไฟล์โค้ดสำหรับส่วนประกอบไปป์ไลน์
- /ลูกค้ากำลังจะเปลี่ยนใจจากคุณ
- sagemaker-ไปป์ไลน์-project.ipynb – โน้ตบุ๊กที่เดินผ่านเวิร์กโฟลว์การสร้างแบบจำลองโดยใช้คุณสมบัติใหม่ของ Pipelines
ดาวน์โหลดชุดข้อมูล
หากต้องการติดตามบทความนี้ คุณต้องดาวน์โหลดและบันทึกไฟล์ ชุดข้อมูลตัวอย่าง ภายใต้โฟลเดอร์ data ภายในโฮมไดเร็กทอรีของโครงการ ซึ่งบันทึกไฟล์ใน ระบบไฟล์ Amazon Elastic (Amazon EFS) ภายในสภาพแวดล้อมของ Studio
สร้างส่วนประกอบไปป์ไลน์
ตอนนี้คุณพร้อมที่จะสร้างส่วนประกอบไปป์ไลน์แล้ว
นำเข้าคำสั่งและประกาศพารามิเตอร์และค่าคงที่
สร้างสมุดบันทึก Studio ชื่อ sagemaker-pipelines-project.ipynb
ภายในโฮมไดเร็กทอรีของโครงการ ป้อนบล็อกโค้ดต่อไปนี้ในเซลล์ และเรียกใช้เซลล์เพื่อตั้งค่าออบเจ็กต์ไคลเอ็นต์ SageMaker และ S3 สร้าง PipelineSession
และตั้งค่าตำแหน่งบัคเก็ต S3 โดยใช้บัคเก็ตเริ่มต้นที่มาพร้อมกับเซสชัน SageMaker:
import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"
ไปป์ไลน์รองรับการกำหนดพารามิเตอร์ ซึ่งช่วยให้คุณระบุพารามิเตอร์อินพุตขณะรันไทม์โดยไม่ต้องเปลี่ยนโค้ดไปป์ไลน์ คุณสามารถใช้โมดูลที่มีอยู่ภายใต้ sagemaker.workflow.parameters
โมดูล เช่น ParameterInteger
, ParameterFloat
และ ParameterString
เพื่อระบุพารามิเตอร์ไปป์ไลน์ของประเภทข้อมูลต่างๆ รันโค้ดต่อไปนี้เพื่อตั้งค่าพารามิเตอร์อินพุตหลายตัว:
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge"
)
input_data = ParameterString(
name="InputData",
default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
สร้างชุดข้อมูลแบทช์
สร้างชุดข้อมูลชุดงาน ซึ่งคุณใช้ภายหลังในขั้นตอนการแปลงชุดงาน:
def preprocess_batch_data(file_path):
df = pd.read_csv(file_path)
## Convert to datetime columns
df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
## Drop Rows with null values
df = df.dropna()
## Create Column which gives the days between the last order and the first order
df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
## Create Column which gives the days between when the customer record was created and the first order
df['created'] = pd.to_datetime(df['created'])
df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
## Drop Columns
df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
## Apply one hot encoding on favday and city columns
df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
return df
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)
อัปโหลดข้อมูลไปยังบัคเก็ต S3
อัปโหลดชุดข้อมูลไปยัง Amazon S3:
s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")
กำหนดสคริปต์การประมวลผลและขั้นตอนการประมวลผล
ในขั้นตอนนี้ คุณต้องเตรียมสคริปต์ Python เพื่อทำวิศวกรรมคุณลักษณะ การเข้ารหัสแบบด่วนหนึ่งครั้ง และดูแลจัดการการฝึกอบรม การตรวจสอบ และการแยกการทดสอบเพื่อใช้สำหรับการสร้างแบบจำลอง รันโค้ดต่อไปนี้เพื่อสร้างสคริปต์การประมวลผลของคุณ:
%%writefile pipelines/customerchurn/preprocess.py
import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
base_dir = "/opt/ml/processing"
#Read Data
df = pd.read_csv(
f"{base_dir}/input/storedata_total.csv"
)
# convert created column to datetime
df["created"] = pd.to_datetime(df["created"])
#Convert firstorder and lastorder to datetime datatype
df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
#Drop Rows with Null Values
df = df.dropna()
#Create column which gives the days between the last order and the first order
df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
#Create column which gives the days between the customer record was created and the first order
df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
#Drop columns
df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
#Apply one hot encoding on favday and city columns
df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
# Split into train, validation and test datasets
y = df.pop("retained")
X_pre = df
y_pre = y.to_numpy().reshape(len(y), 1)
X = np.concatenate((y_pre, X_pre), axis=1)
np.random.shuffle(X)
# Split in Train, Test and Validation Datasets
train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
train_rows = np.shape(train)[0]
validation_rows = np.shape(validation)[0]
test_rows = np.shape(test)[0]
train = pd.DataFrame(train)
test = pd.DataFrame(test)
validation = pd.DataFrame(validation)
# Convert the label column to integer
train[0] = train[0].astype(int)
test[0] = test[0].astype(int)
validation[0] = validation[0].astype(int)
# Save the Dataframes as csv files
train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
ถัดไป เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อสร้างอินสแตนซ์ตัวประมวลผล และขั้นตอนไปป์ไลน์เพื่อเรียกใช้สคริปต์การประมวลผล เนื่องจากสคริปต์การประมวลผลเขียนด้วย Pandas คุณจึงใช้ a SKLearn โปรเซสเซอร์. ท่อส่ง ProcessingStep
ฟังก์ชันรับอาร์กิวเมนต์ต่อไปนี้: ตัวประมวลผล ตำแหน่งอินพุต S3 สำหรับชุดข้อมูลดิบ และตำแหน่งเอาต์พุต S3 เพื่อบันทึกชุดข้อมูลที่ประมวลผล
# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")
# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name="sklearn-churn-process",
role=role,
sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
destination=f"s3://{default_bucket}/output/train" ),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
destination=f"s3://{default_bucket}/output/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
destination=f"s3://{default_bucket}/output/test")
],
code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)
กำหนดขั้นตอนการฝึก
ตั้งค่าการฝึกโมเดลโดยใช้ตัวประมาณ SageMaker XGBoost และไปป์ไลน์ TrainingStep
ฟังก์ชั่น:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path=model_path,
role=role,
sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
)
train_args = xgb_train.fit(
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
},
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
name="ChurnModelTrain",
step_args=train_args,
)
กำหนดสคริปต์การประเมินและขั้นตอนการประเมินแบบจำลอง
เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อประเมินโมเดลเมื่อฝึกแล้ว สคริปต์นี้สรุปตรรกะเพื่อตรวจสอบว่าคะแนน AUC ตรงตามเกณฑ์ที่ระบุหรือไม่
%%writefile pipelines/customerchurn/evaluate.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":
#Read Model Tar File
model_path = f"/opt/ml/processing/model/model.tar.gz"
with tarfile.open(model_path) as tar:
tar.extractall(path=".")
model = pickle.load(open("xgboost-model", "rb"))
#Read Test Data using which we evaluate the model
test_path = "/opt/ml/processing/test/test.csv"
df = pd.read_csv(test_path, header=None)
y_test = df.iloc[:, 0].to_numpy()
df.drop(df.columns[0], axis=1, inplace=True)
X_test = xgboost.DMatrix(df.values)
#Run Predictions
predictions = model.predict(X_test)
#Evaluate Predictions
fpr, tpr, thresholds = roc_curve(y_test, predictions)
auc_score = auc(fpr, tpr)
report_dict = {
"classification_metrics": {
"auc_score": {
"value": auc_score,
},
},
}
#Save Evaluation Report
output_dir = "/opt/ml/processing/evaluation"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
evaluation_path = f"{output_dir}/evaluation.json"
with open(evaluation_path, "w") as f:
f.write(json.dumps(report_dict))
ถัดไป เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อสร้างอินสแตนซ์ตัวประมวลผล และขั้นตอนไปป์ไลน์เพื่อเรียกใช้สคริปต์การประเมิน เนื่องจากสคริปต์การประเมินใช้แพ็คเกจ XGBoost คุณจึงใช้ a ScriptProcessor
พร้อมกับภาพ XGBoost ท่อส่ง ProcessingStep
ฟังก์ชันรับอาร์กิวเมนต์ต่อไปนี้: ตัวประมวลผล ตำแหน่งอินพุต S3 สำหรับชุดข้อมูลดิบ และตำแหน่งเอาต์พุต S3 เพื่อบันทึกชุดข้อมูลที่ประมวลผล
#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=processing_instance_type,
instance_count=1,
base_job_name="script-churn-eval",
role=role,
sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
inputs=[
ProcessingInput(
source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model",
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
destination=f"s3://{default_bucket}/output/evaluation"),
],
code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
name="ChurnEvalModel",
step_args=eval_args,
property_files=[evaluation_report],
)
กำหนดขั้นตอนการสร้างแบบจำลอง
เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อสร้างโมเดล SageMaker โดยใช้ขั้นตอนโมเดลไปป์ไลน์ ขั้นตอนนี้ใช้ผลลัพธ์ของขั้นตอนการฝึกอบรมเพื่อจัดแพ็คเกจโมเดลสำหรับการปรับใช้ โปรดทราบว่าค่าสำหรับอาร์กิวเมนต์ประเภทอินสแตนซ์จะถูกส่งโดยใช้พารามิเตอร์ไปป์ไลน์ที่คุณกำหนดไว้ก่อนหน้าในโพสต์
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
step_create_model = ModelStep(
name="ChurnCreateModel",
step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)
กำหนดขั้นตอนการแปลงแบทช์
รันบล็อคโค้ดต่อไปนี้เพื่อรันการแปลงแบบแบตช์โดยใช้โมเดลที่ได้รับการฝึกด้วยอินพุตแบทช์ที่สร้างขึ้นในขั้นตอนแรก:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
transformer = Transformer(
model_name=step_create_model.properties.ModelName,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=f"s3://{default_bucket}/ChurnTransform",
sagemaker_session=pipeline_session
)
step_transform = TransformStep(
name="ChurnTransform",
step_args=transformer.transform(
data=batch_data,
content_type="text/csv"
)
)
กำหนดขั้นตอนแบบจำลองการลงทะเบียน
โค้ดต่อไปนี้ลงทะเบียนโมเดลภายในรีจีสทรีโมเดล SageMaker โดยใช้ขั้นตอนโมเดลไปป์ไลน์:
model = Model(
image_uri=image_uri,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri="{}/evaluation.json".format(
step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
),
content_type="application/json",
)
)
register_args = model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)
กำหนดขั้นตอนที่ล้มเหลวเพื่อหยุดไปป์ไลน์
รหัสต่อไปนี้กำหนดขั้นตอนความล้มเหลวของไปป์ไลน์เพื่อหยุดการทำงานของไปป์ไลน์โดยมีข้อความแสดงข้อผิดพลาดหากคะแนน AUC ไม่ตรงตามเกณฑ์ที่กำหนดไว้:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
name="ChurnAUCScoreFail",
error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
)
กำหนดขั้นตอนเงื่อนไขเพื่อตรวจสอบคะแนน AUC
รหัสต่อไปนี้กำหนดขั้นตอนเงื่อนไขเพื่อตรวจสอบคะแนน AUC และสร้างแบบจำลองตามเงื่อนไขและเรียกใช้การแปลงชุดงานและลงทะเบียนแบบจำลองในการลงทะเบียนแบบจำลอง หรือหยุดการทำงานของไปป์ไลน์ในสถานะที่ล้มเหลว:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
left=JsonGet(
step_name=step_eval.name,
property_file=evaluation_report,
json_path="classification_metrics.auc_score.value",
),
right=auc_score_threshold,
)
step_cond = ConditionStep(
name="CheckAUCScoreChurnEvaluation",
conditions=[cond_lte],
if_steps=[step_register, step_create_model, step_transform],
else_steps=[step_fail],
)
สร้างและเรียกใช้ไปป์ไลน์
หลังจากกำหนดขั้นตอนส่วนประกอบทั้งหมดแล้ว คุณสามารถประกอบเป็นวัตถุไปป์ไลน์ได้ คุณไม่จำเป็นต้องระบุลำดับของไปป์ไลน์เนื่องจากไปป์ไลน์จะอนุมานลำดับการสั่งซื้อโดยอัตโนมัติตามการขึ้นต่อกันระหว่างขั้นตอนต่างๆ
import json
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_count,
processing_instance_type,
training_instance_type,
model_approval_status,
input_data,
batch_data,
auc_score_threshold,
],
steps=[step_process, step_train, step_eval, step_cond],
)
definition = json.loads(pipeline.definition())
print(definition)
เรียกใช้รหัสต่อไปนี้ในเซลล์ในสมุดบันทึกของคุณ หากไปป์ไลน์มีอยู่แล้ว รหัสจะอัพเดตไปป์ไลน์ หากไม่มีไปป์ไลน์ ระบบจะสร้างไพพ์ไลน์ใหม่
pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution
สรุป
ในโพสต์นี้ เราได้แนะนำคุณลักษณะใหม่บางอย่างที่ขณะนี้พร้อมใช้งานในไปป์ไลน์พร้อมกับฟีเจอร์ SageMaker ในตัวอื่นๆ และอัลกอริทึม XGBoost เพื่อพัฒนา ทำซ้ำ และปรับใช้โมเดลสำหรับการทำนายการปั่นป่วน โซลูชันสามารถขยายได้ด้วยแหล่งข้อมูลเพิ่มเติม
เพื่อใช้เวิร์กโฟลว์ ML ของคุณเอง สำหรับรายละเอียดเพิ่มเติมเกี่ยวกับขั้นตอนที่มีอยู่ในเวิร์กโฟลว์ไปป์ไลน์ โปรดดูที่ ไปป์ไลน์การสร้างแบบจำลอง Amazon SageMaker และ เวิร์กโฟลว์ SageMaker. ตัวอย่าง AWS SageMaker GitHub repo มีตัวอย่างเพิ่มเติมเกี่ยวกับกรณีการใช้งานต่างๆ โดยใช้ไปป์ไลน์
เกี่ยวกับผู้เขียน
เจอร์รี่ เผิง เป็นวิศวกรพัฒนาซอฟต์แวร์กับ AWS SageMaker เขามุ่งเน้นที่การสร้างระบบ MLOps ขนาดใหญ่แบบครบวงจรตั้งแต่การฝึกอบรมไปจนถึงการตรวจสอบแบบจำลองในการผลิต เขายังหลงใหลในการนำแนวคิดของ MLOps ไปสู่ผู้ชมที่กว้างขึ้น
เต๋อเหวินฉี เป็นวิศวกรพัฒนาซอฟต์แวร์ใน AWS ปัจจุบันเธอมุ่งเน้นที่การพัฒนาและปรับปรุง SageMaker Pipelines นอกงาน เธอสนุกกับการฝึกเชลโล่
คยาตรี กานาโกตา เป็น Sr. Machine Learning Engineer พร้อม AWS Professional Services เธอหลงใหลในการพัฒนา ปรับใช้ และอธิบายโซลูชัน AI/ ML ในโดเมนต่างๆ ก่อนหน้าบทบาทนี้ เธอเป็นผู้นำความคิดริเริ่มหลายอย่างในฐานะนักวิทยาศาสตร์ข้อมูลและวิศวกร ML กับบริษัทชั้นนำระดับโลกในด้านการเงินและการค้าปลีก เธอสำเร็จการศึกษาระดับปริญญาโทด้านวิทยาการคอมพิวเตอร์ที่เชี่ยวชาญด้าน Data Science จากมหาวิทยาลัยโคโลราโด โบลเดอร์
รูพินเดอร์ กรีวาล เป็น Sr Ai/ML Specialist Solutions Architect กับ AWS ปัจจุบันเขามุ่งเน้นการให้บริการโมเดลและ MLOps บน SageMaker ก่อนหน้าจะรับตำแหน่งนี้ เขาเคยทำงานเป็นวิศวกรด้านการเรียนรู้ของเครื่องจักรในการสร้างและโฮสต์โมเดล นอกเวลางาน เขาชอบเล่นเทนนิสและขี่จักรยานบนเส้นทางบนภูเขา
เรย์ ลี่ เป็น Sr. Data Scientist ที่มี AWS Professional Services ความเชี่ยวชาญพิเศษของเขามุ่งเน้นไปที่การสร้างและดำเนินการโซลูชัน AI/ML สำหรับลูกค้าที่มีขนาดแตกต่างกัน ตั้งแต่สตาร์ทอัพไปจนถึงองค์กรระดับองค์กร นอกเวลางาน เรย์ชอบออกกำลังกายและเดินทาง