คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK PlatoBlockchain Data Intelligence ค้นหาแนวตั้ง AI.

คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK

ท่อส่ง Amazon SageMaker อนุญาตให้นักวิทยาศาสตร์ข้อมูลและวิศวกรแมชชีนเลิร์นนิง (ML) กำหนดเวิร์กโฟลว์การฝึกอบรมโดยอัตโนมัติ ซึ่งช่วยให้คุณสร้างกระบวนการที่ทำซ้ำได้เพื่อควบคุมขั้นตอนการพัฒนาแบบจำลองสำหรับการทดลองอย่างรวดเร็วและการฝึกจำลองใหม่ คุณสามารถทำให้เวิร์กโฟลว์การสร้างแบบจำลองทั้งหมดเป็นแบบอัตโนมัติ รวมถึงการจัดเตรียมข้อมูล วิศวกรรมคุณลักษณะ การฝึกอบรมแบบจำลอง การปรับแต่งแบบจำลอง และการตรวจสอบแบบจำลอง และแค็ตตาล็อกในรีจีสทรีแบบจำลอง คุณสามารถกำหนดค่าไปป์ไลน์ให้ทำงานโดยอัตโนมัติตามช่วงเวลาปกติหรือเมื่อมีการทริกเกอร์เหตุการณ์บางอย่าง หรือคุณสามารถเรียกใช้ด้วยตนเองได้ตามต้องการ

ในโพสต์นี้ เราเน้นย้ำถึงการปรับปรุงบางอย่างของ อเมซอน SageMaker SDK และแนะนำคุณสมบัติใหม่ของ Amazon SageMaker Pipelines ที่ช่วยให้ผู้ปฏิบัติงาน ML สร้างและฝึกอบรมโมเดล ML ได้ง่ายขึ้น

Pipelines ยังคงสร้างสรรค์ประสบการณ์ของนักพัฒนาอย่างต่อเนื่อง และด้วยการเปิดตัวล่าสุดเหล่านี้ ตอนนี้คุณสามารถใช้บริการในรูปแบบที่กำหนดเองมากขึ้น:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – อัปเดตเอกสารเกี่ยวกับ PipelineVariable การใช้งานสำหรับคลาสพื้นฐานของตัวประมาณ โปรเซสเซอร์ จูนเนอร์ หม้อแปลงและโมเดล โมเดล Amazon และโมเดลเฟรมเวิร์ก จะมีการเปลี่ยนแปลงเพิ่มเติมที่มาพร้อมกับ SDK เวอร์ชันใหม่กว่าเพื่อรองรับคลาสย่อยทั้งหมดของตัวประมาณและตัวประมวลผล
  • 2.90.0 – ความพร้อมใช้งานของ รุ่นStep สำหรับงานสร้างทรัพยากรแบบจำลองและการลงทะเบียนแบบบูรณาการ
  • 2.88.2 – ความพร้อมใช้งานของ เซสชันไปป์ไลน์ สำหรับการโต้ตอบที่มีการจัดการกับเอนทิตีและทรัพยากรของ SageMaker
  • 2.88.2 – ความเข้ากันได้ของคลาสย่อยสำหรับ ขั้นตอนงานไปป์ไลน์เวิร์กโฟลว์ เพื่อให้คุณสามารถสร้างงานนามธรรมและกำหนดค่าและเรียกใช้การประมวลผล การฝึกอบรม การแปลง และการปรับแต่งงานได้เหมือนที่คุณทำโดยไม่มีไปป์ไลน์
  • 2.76.0 – ความพร้อมใช้งานของ ขั้นตอนล้มเหลว เพื่อหยุดไปป์ไลน์ที่มีสถานะล้มเหลวตามเงื่อนไข

ในโพสต์นี้ เราจะแนะนำคุณเกี่ยวกับเวิร์กโฟลว์โดยใช้ชุดข้อมูลตัวอย่างโดยเน้นที่การสร้างแบบจำลองและการปรับใช้เพื่อสาธิตวิธีการใช้คุณลักษณะใหม่ของไปป์ไลน์ ในตอนท้าย คุณควรมีข้อมูลเพียงพอที่จะใช้คุณลักษณะใหม่เหล่านี้ได้สำเร็จและทำให้ปริมาณงาน ML ของคุณง่ายขึ้น

ภาพรวมคุณสมบัติ

Pipelines นำเสนอคุณสมบัติใหม่ดังต่อไปนี้:

  • คำอธิบายประกอบตัวแปรไปป์ไลน์ – พารามิเตอร์เมธอดบางตัวยอมรับอินพุตหลายประเภท รวมถึง PipelineVariablesและมีการเพิ่มเอกสารเพิ่มเติมเพื่อชี้แจงว่า PipelineVariables ได้รับการสนับสนุนทั้งในเอกสาร SageMaker SDK เวอร์ชันเสถียรล่าสุดและลายเซ็นเริ่มต้นของฟังก์ชัน ตัวอย่างเช่น ในตัวประมาณค่า TensorFlow ต่อไปนี้ ลายเซ็นเริ่มต้นจะแสดงว่า model_dir และ image_uri สนับสนุน PipelineVariablesในขณะที่พารามิเตอร์อื่นไม่ทำ สำหรับข้อมูลเพิ่มเติม โปรดดูที่ เครื่องมือประมาณการเทนเซอร์โฟลว์.
    • ก่อน:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • หลังจากที่:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • เซสชั่นไปป์ไลน์ - เซสชันไปป์ไลน์ เป็นแนวคิดใหม่ที่นำมาใช้เพื่อนำความสามัคคีใน SageMaker SDK และแนะนำการเริ่มต้นใช้งานทรัพยากรไปป์ไลน์แบบขี้เกียจ ดิ PipelineSession บริบทสืบทอด SageMakerSession และใช้วิธีที่สะดวกสำหรับคุณในการโต้ตอบกับเอนทิตีและทรัพยากร SageMaker อื่นๆ เช่น งานฝึกอบรม จุดปลาย และชุดข้อมูลอินพุตที่จัดเก็บไว้ใน บริการจัดเก็บข้อมูลอย่างง่ายของ Amazon (อเมซอน เอส3).
  • ความเข้ากันได้ของคลาสย่อยกับขั้นตอนงานไปป์ไลน์เวิร์กโฟลว์ – ตอนนี้คุณสามารถสร้างนามธรรมของงานและกำหนดค่าและเรียกใช้การประมวลผล การฝึกอบรม การแปลง และการปรับแต่งงานได้ตามที่คุณต้องการโดยไม่ต้องใช้ไปป์ไลน์
    • ตัวอย่างเช่น การสร้างขั้นตอนการประมวลผลด้วย SKLearnProcessor ก่อนหน้านี้ต้องการสิ่งต่อไปนี้:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • ดังที่เราเห็นในรหัสก่อนหน้า ProcessingStep ต้องทำโดยพื้นฐานตรรกะการประมวลผลล่วงหน้าเช่นเดียวกับ .runโดยไม่ต้องเริ่มต้นการเรียก API เพื่อเริ่มงาน แต่ด้วยความเข้ากันได้ของคลาสย่อยในขณะนี้ที่เปิดใช้งานด้วยขั้นตอนงานไปป์ไลน์เวิร์กโฟลว์ เราประกาศ step_args อาร์กิวเมนต์ที่ใช้ตรรกะก่อนการประมวลผลด้วย .run เพื่อให้คุณสามารถสร้างสิ่งที่เป็นนามธรรมของงานและกำหนดค่าตามที่คุณต้องการใช้โดยไม่มีไปป์ไลน์ นอกจากนี้เรายังผ่านใน pipeline_sessionซึ่งเป็น PipelineSession วัตถุ แทน sagemaker_session เพื่อให้แน่ใจว่ามีการเรียกใช้การโทรแต่จะไม่ถูกเรียกจนกว่าจะมีการสร้างและเรียกใช้ไปป์ไลน์ ดูรหัสต่อไปนี้:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • ขั้นตอนแบบจำลอง (แนวทางที่คล่องตัวด้วยขั้นตอนการสร้างแบบจำลองและการลงทะเบียน) –Pipelines มีขั้นตอนสองประเภทเพื่อรวมเข้ากับรุ่น SageMaker: CreateModelStep และ RegisterModel. ตอนนี้คุณสามารถบรรลุทั้งสองได้โดยใช้เพียง ModelStep พิมพ์. สังเกตว่า a PipelineSession จำเป็นเพื่อให้บรรลุสิ่งนี้ สิ่งนี้ทำให้เกิดความคล้ายคลึงกันระหว่างขั้นตอนไปป์ไลน์และ SDK
    • ก่อน:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • หลังจากที่:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • ขั้นตอนที่ล้มเหลว (หยุดตามเงื่อนไขของการรันไปป์ไลน์) - FailStep อนุญาตให้ไปป์ไลน์หยุดทำงานโดยมีสถานะล้มเหลวหากตรงตามเงื่อนไข เช่น หากคะแนนโมเดลต่ำกว่าเกณฑ์ที่กำหนด

ภาพรวมโซลูชัน

ในโซลูชันนี้ จุดเริ่มต้นของคุณคือ สตูดิโอ Amazon SageMaker สภาพแวดล้อมการพัฒนาแบบบูรณาการ (IDE) สำหรับการทดลองอย่างรวดเร็ว Studio เสนอสภาพแวดล้อมในการจัดการประสบการณ์ไปป์ไลน์แบบ end-to-end ด้วย Studio คุณสามารถเลี่ยง คอนโซลการจัดการ AWS สำหรับการจัดการเวิร์กโฟลว์ทั้งหมดของคุณ สำหรับข้อมูลเพิ่มเติมเกี่ยวกับการจัดการไปป์ไลน์จากภายใน Studio โปรดดูที่ ดู ติดตาม และดำเนินการไปป์ไลน์ SageMaker ใน SageMaker Studio.

ไดอะแกรมต่อไปนี้แสดงสถาปัตยกรรมระดับสูงของเวิร์กโฟลว์ ML พร้อมขั้นตอนต่างๆ ในการฝึกและสร้างการอนุมานโดยใช้คุณลักษณะใหม่

ไปป์ไลน์มีขั้นตอนต่อไปนี้:

  1. ประมวลผลข้อมูลล่วงหน้าเพื่อสร้างคุณสมบัติที่จำเป็นและแยกข้อมูลออกเป็นชุดข้อมูลฝึก การตรวจสอบ และทดสอบ
  2. สร้างงานการฝึกอบรมด้วยเฟรมเวิร์ก SageMaker XGBoost
  3. ประเมินแบบจำลองที่ได้รับการฝึกอบรมโดยใช้ชุดข้อมูลทดสอบ
  4. ตรวจสอบว่าคะแนน AUC สูงกว่าเกณฑ์ที่กำหนดไว้ล่วงหน้าหรือไม่
    • หากคะแนน AUC น้อยกว่าเกณฑ์ ให้หยุดการทำงานของไปป์ไลน์และทำเครื่องหมายว่าล้มเหลว
    • หากคะแนน AUC มากกว่าเกณฑ์ ให้สร้างโมเดล SageMaker และลงทะเบียนในรีจีสทรีโมเดล SageMaker
  5. ใช้การแปลงเป็นชุดในชุดข้อมูลที่กำหนดโดยใช้แบบจำลองที่สร้างในขั้นตอนก่อนหน้า

เบื้องต้น

เพื่อติดตามโพสต์นี้ คุณต้องมีบัญชี AWS ที่มี a โดเมนสตูดิโอ.

Pipelines ถูกรวมเข้ากับเอนทิตีและทรัพยากรของ SageMaker โดยตรง ดังนั้นคุณไม่จำเป็นต้องโต้ตอบกับบริการอื่นๆ ของ AWS คุณไม่จำเป็นต้องจัดการทรัพยากรใดๆ เนื่องจากเป็นบริการที่มีการจัดการเต็มรูปแบบ ซึ่งหมายความว่าจะสร้างและจัดการทรัพยากรให้กับคุณ สำหรับข้อมูลเพิ่มเติมเกี่ยวกับส่วนประกอบ SageMaker ต่างๆ ที่เป็นทั้ง Python API แบบสแตนด์อโลนพร้อมกับส่วนประกอบที่ผสานรวมของ Studio ให้ดูที่ หน้าผลิตภัณฑ์ SageMaker.

ก่อนเริ่มต้น ให้ติดตั้ง SageMaker SDK เวอร์ชัน >= 2.104.0 และ xlrd >=1.0.0 ภายในโน้ตบุ๊ก Studio โดยใช้ข้อมูลโค้ดต่อไปนี้:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML เวิร์กโฟลว์

สำหรับโพสต์นี้ คุณใช้ส่วนประกอบต่อไปนี้:

  • การเตรียมข้อมูล
    • การประมวลผล SageMaker – การประมวลผล SageMaker เป็นบริการที่มีการจัดการเต็มรูปแบบ ซึ่งช่วยให้คุณสามารถเรียกใช้การแปลงข้อมูลแบบกำหนดเองและวิศวกรรมคุณลักษณะสำหรับปริมาณงาน ML
  • การสร้างแบบจำลอง
  • การฝึกแบบจำลองและการประเมิน
    • การฝึกอบรมในคลิกเดียว – SageMaker แจกจ่ายคุณสมบัติการฝึกอบรม SageMaker จัดเตรียมไลบรารีการฝึกอบรมแบบกระจายสำหรับการขนานข้อมูลและความเท่าเทียมของแบบจำลอง ไลบรารีได้รับการปรับให้เหมาะสมสำหรับสภาพแวดล้อมการฝึกอบรม SageMaker ช่วยปรับงานการฝึกอบรมแบบกระจายไปยัง SageMaker และปรับปรุงความเร็วในการฝึกอบรมและปริมาณงาน
    • การทดลอง SageMaker – Experiments เป็นความสามารถของ SageMaker ที่ให้คุณจัดระเบียบ ติดตาม เปรียบเทียบ และประเมินการวนซ้ำ ML ของคุณ
    • การแปลงแบทช์ SageMaker – การแปลงแบทช์หรือการให้คะแนนแบบออฟไลน์เป็นบริการที่มีการจัดการใน SageMaker ที่ให้คุณคาดการณ์ชุดข้อมูลขนาดใหญ่ขึ้นโดยใช้โมเดล ML ของคุณ
  • การประสานเวิร์กโฟลว์

ไปป์ไลน์ SageMaker คือชุดของขั้นตอนที่เชื่อมต่อถึงกันซึ่งกำหนดโดยคำจำกัดความไปป์ไลน์ JSON มันเข้ารหัสไปป์ไลน์โดยใช้กราฟ acyclic กำกับ (DAG) DAG ให้ข้อมูลเกี่ยวกับข้อกำหนดและความสัมพันธ์ระหว่างแต่ละขั้นตอนของไปป์ไลน์ และโครงสร้างถูกกำหนดโดยการอ้างอิงข้อมูลระหว่างขั้นตอนต่างๆ การขึ้นต่อกันเหล่านี้ถูกสร้างขึ้นเมื่อคุณสมบัติของเอาต์พุตของขั้นตอนถูกส่งผ่านเป็นอินพุตไปยังอีกขั้นตอนหนึ่ง

ไดอะแกรมต่อไปนี้แสดงขั้นตอนต่างๆ ในไปป์ไลน์ SageMaker (สำหรับกรณีการใช้งานการคาดการณ์แบบปั่นป่วน) ซึ่ง SageMaker อนุมานการเชื่อมต่อระหว่างขั้นตอนต่างๆ ตามอินพุตและเอาต์พุตที่กำหนดโดยข้อกำหนดขั้นตอน

ส่วนถัดไปจะอธิบายเกี่ยวกับการสร้างแต่ละขั้นตอนของไปป์ไลน์และเรียกใช้ไปป์ไลน์ทั้งหมดเมื่อสร้างเสร็จ

คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK PlatoBlockchain Data Intelligence ค้นหาแนวตั้ง AI.

โครงสร้างโครงการ

เริ่มจากโครงสร้างโครงการกันก่อน:

  • / sm-pipelines-end-to-end-example – ชื่อโครงการ
    • /ข้อมูล – ชุดข้อมูล
    • /ท่อส่ง – ไฟล์โค้ดสำหรับส่วนประกอบไปป์ไลน์
      • /ลูกค้ากำลังจะเปลี่ยนใจจากคุณ
        • preprocess.py
        • ประเมิน.py
    • sagemaker-ไปป์ไลน์-project.ipynb – โน้ตบุ๊กที่เดินผ่านเวิร์กโฟลว์การสร้างแบบจำลองโดยใช้คุณสมบัติใหม่ของ Pipelines

ดาวน์โหลดชุดข้อมูล

หากต้องการติดตามบทความนี้ คุณต้องดาวน์โหลดและบันทึกไฟล์ ชุดข้อมูลตัวอย่าง ภายใต้โฟลเดอร์ data ภายในโฮมไดเร็กทอรีของโครงการ ซึ่งบันทึกไฟล์ใน ระบบไฟล์ Amazon Elastic (Amazon EFS) ภายในสภาพแวดล้อมของ Studio

สร้างส่วนประกอบไปป์ไลน์

ตอนนี้คุณพร้อมที่จะสร้างส่วนประกอบไปป์ไลน์แล้ว

นำเข้าคำสั่งและประกาศพารามิเตอร์และค่าคงที่

สร้างสมุดบันทึก Studio ชื่อ sagemaker-pipelines-project.ipynb ภายในโฮมไดเร็กทอรีของโครงการ ป้อนบล็อกโค้ดต่อไปนี้ในเซลล์ และเรียกใช้เซลล์เพื่อตั้งค่าออบเจ็กต์ไคลเอ็นต์ SageMaker และ S3 สร้าง PipelineSessionและตั้งค่าตำแหน่งบัคเก็ต S3 โดยใช้บัคเก็ตเริ่มต้นที่มาพร้อมกับเซสชัน SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

ไปป์ไลน์รองรับการกำหนดพารามิเตอร์ ซึ่งช่วยให้คุณระบุพารามิเตอร์อินพุตขณะรันไทม์โดยไม่ต้องเปลี่ยนโค้ดไปป์ไลน์ คุณสามารถใช้โมดูลที่มีอยู่ภายใต้ sagemaker.workflow.parameters โมดูล เช่น ParameterInteger, ParameterFloatและ ParameterStringเพื่อระบุพารามิเตอร์ไปป์ไลน์ของประเภทข้อมูลต่างๆ รันโค้ดต่อไปนี้เพื่อตั้งค่าพารามิเตอร์อินพุตหลายตัว:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

สร้างชุดข้อมูลแบทช์

สร้างชุดข้อมูลชุดงาน ซึ่งคุณใช้ภายหลังในขั้นตอนการแปลงชุดงาน:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

อัปโหลดข้อมูลไปยังบัคเก็ต S3

อัปโหลดชุดข้อมูลไปยัง Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

กำหนดสคริปต์การประมวลผลและขั้นตอนการประมวลผล

ในขั้นตอนนี้ คุณต้องเตรียมสคริปต์ Python เพื่อทำวิศวกรรมคุณลักษณะ การเข้ารหัสแบบด่วนหนึ่งครั้ง และดูแลจัดการการฝึกอบรม การตรวจสอบ และการแยกการทดสอบเพื่อใช้สำหรับการสร้างแบบจำลอง รันโค้ดต่อไปนี้เพื่อสร้างสคริปต์การประมวลผลของคุณ:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

ถัดไป เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อสร้างอินสแตนซ์ตัวประมวลผล และขั้นตอนไปป์ไลน์เพื่อเรียกใช้สคริปต์การประมวลผล เนื่องจากสคริปต์การประมวลผลเขียนด้วย Pandas คุณจึงใช้ a SKLearn โปรเซสเซอร์. ท่อส่ง ProcessingStep ฟังก์ชันรับอาร์กิวเมนต์ต่อไปนี้: ตัวประมวลผล ตำแหน่งอินพุต S3 สำหรับชุดข้อมูลดิบ และตำแหน่งเอาต์พุต S3 เพื่อบันทึกชุดข้อมูลที่ประมวลผล

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

กำหนดขั้นตอนการฝึก

ตั้งค่าการฝึกโมเดลโดยใช้ตัวประมาณ SageMaker XGBoost และไปป์ไลน์ TrainingStep ฟังก์ชั่น:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

กำหนดสคริปต์การประเมินและขั้นตอนการประเมินแบบจำลอง

เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อประเมินโมเดลเมื่อฝึกแล้ว สคริปต์นี้สรุปตรรกะเพื่อตรวจสอบว่าคะแนน AUC ตรงตามเกณฑ์ที่ระบุหรือไม่

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

ถัดไป เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อสร้างอินสแตนซ์ตัวประมวลผล และขั้นตอนไปป์ไลน์เพื่อเรียกใช้สคริปต์การประเมิน เนื่องจากสคริปต์การประเมินใช้แพ็คเกจ XGBoost คุณจึงใช้ a ScriptProcessor พร้อมกับภาพ XGBoost ท่อส่ง ProcessingStep ฟังก์ชันรับอาร์กิวเมนต์ต่อไปนี้: ตัวประมวลผล ตำแหน่งอินพุต S3 สำหรับชุดข้อมูลดิบ และตำแหน่งเอาต์พุต S3 เพื่อบันทึกชุดข้อมูลที่ประมวลผล

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

กำหนดขั้นตอนการสร้างแบบจำลอง

เรียกใช้บล็อกโค้ดต่อไปนี้เพื่อสร้างโมเดล SageMaker โดยใช้ขั้นตอนโมเดลไปป์ไลน์ ขั้นตอนนี้ใช้ผลลัพธ์ของขั้นตอนการฝึกอบรมเพื่อจัดแพ็คเกจโมเดลสำหรับการปรับใช้ โปรดทราบว่าค่าสำหรับอาร์กิวเมนต์ประเภทอินสแตนซ์จะถูกส่งโดยใช้พารามิเตอร์ไปป์ไลน์ที่คุณกำหนดไว้ก่อนหน้าในโพสต์

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

กำหนดขั้นตอนการแปลงแบทช์

รันบล็อคโค้ดต่อไปนี้เพื่อรันการแปลงแบบแบตช์โดยใช้โมเดลที่ได้รับการฝึกด้วยอินพุตแบทช์ที่สร้างขึ้นในขั้นตอนแรก:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

กำหนดขั้นตอนแบบจำลองการลงทะเบียน

โค้ดต่อไปนี้ลงทะเบียนโมเดลภายในรีจีสทรีโมเดล SageMaker โดยใช้ขั้นตอนโมเดลไปป์ไลน์:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

กำหนดขั้นตอนที่ล้มเหลวเพื่อหยุดไปป์ไลน์

รหัสต่อไปนี้กำหนดขั้นตอนความล้มเหลวของไปป์ไลน์เพื่อหยุดการทำงานของไปป์ไลน์โดยมีข้อความแสดงข้อผิดพลาดหากคะแนน AUC ไม่ตรงตามเกณฑ์ที่กำหนดไว้:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

กำหนดขั้นตอนเงื่อนไขเพื่อตรวจสอบคะแนน AUC

รหัสต่อไปนี้กำหนดขั้นตอนเงื่อนไขเพื่อตรวจสอบคะแนน AUC และสร้างแบบจำลองตามเงื่อนไขและเรียกใช้การแปลงชุดงานและลงทะเบียนแบบจำลองในการลงทะเบียนแบบจำลอง หรือหยุดการทำงานของไปป์ไลน์ในสถานะที่ล้มเหลว:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

สร้างและเรียกใช้ไปป์ไลน์

หลังจากกำหนดขั้นตอนส่วนประกอบทั้งหมดแล้ว คุณสามารถประกอบเป็นวัตถุไปป์ไลน์ได้ คุณไม่จำเป็นต้องระบุลำดับของไปป์ไลน์เนื่องจากไปป์ไลน์จะอนุมานลำดับการสั่งซื้อโดยอัตโนมัติตามการขึ้นต่อกันระหว่างขั้นตอนต่างๆ

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

เรียกใช้รหัสต่อไปนี้ในเซลล์ในสมุดบันทึกของคุณ หากไปป์ไลน์มีอยู่แล้ว รหัสจะอัพเดตไปป์ไลน์ หากไม่มีไปป์ไลน์ ระบบจะสร้างไพพ์ไลน์ใหม่

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

สรุป

ในโพสต์นี้ เราได้แนะนำคุณลักษณะใหม่บางอย่างที่ขณะนี้พร้อมใช้งานในไปป์ไลน์พร้อมกับฟีเจอร์ SageMaker ในตัวอื่นๆ และอัลกอริทึม XGBoost เพื่อพัฒนา ทำซ้ำ และปรับใช้โมเดลสำหรับการทำนายการปั่นป่วน โซลูชันสามารถขยายได้ด้วยแหล่งข้อมูลเพิ่มเติม

เพื่อใช้เวิร์กโฟลว์ ML ของคุณเอง สำหรับรายละเอียดเพิ่มเติมเกี่ยวกับขั้นตอนที่มีอยู่ในเวิร์กโฟลว์ไปป์ไลน์ โปรดดูที่ ไปป์ไลน์การสร้างแบบจำลอง Amazon SageMaker และ เวิร์กโฟลว์ SageMaker. ตัวอย่าง AWS SageMaker GitHub repo มีตัวอย่างเพิ่มเติมเกี่ยวกับกรณีการใช้งานต่างๆ โดยใช้ไปป์ไลน์


เกี่ยวกับผู้เขียน

คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK PlatoBlockchain Data Intelligence ค้นหาแนวตั้ง AI.เจอร์รี่ เผิง เป็นวิศวกรพัฒนาซอฟต์แวร์กับ AWS SageMaker เขามุ่งเน้นที่การสร้างระบบ MLOps ขนาดใหญ่แบบครบวงจรตั้งแต่การฝึกอบรมไปจนถึงการตรวจสอบแบบจำลองในการผลิต เขายังหลงใหลในการนำแนวคิดของ MLOps ไปสู่ผู้ชมที่กว้างขึ้น

คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK PlatoBlockchain Data Intelligence ค้นหาแนวตั้ง AI.เต๋อเหวินฉี เป็นวิศวกรพัฒนาซอฟต์แวร์ใน AWS ปัจจุบันเธอมุ่งเน้นที่การพัฒนาและปรับปรุง SageMaker Pipelines นอกงาน เธอสนุกกับการฝึกเชลโล่

คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK PlatoBlockchain Data Intelligence ค้นหาแนวตั้ง AI.คยาตรี กานาโกตา เป็น Sr. Machine Learning Engineer พร้อม AWS Professional Services เธอหลงใหลในการพัฒนา ปรับใช้ และอธิบายโซลูชัน AI/ ML ในโดเมนต่างๆ ก่อนหน้าบทบาทนี้ เธอเป็นผู้นำความคิดริเริ่มหลายอย่างในฐานะนักวิทยาศาสตร์ข้อมูลและวิศวกร ML กับบริษัทชั้นนำระดับโลกในด้านการเงินและการค้าปลีก เธอสำเร็จการศึกษาระดับปริญญาโทด้านวิทยาการคอมพิวเตอร์ที่เชี่ยวชาญด้าน Data Science จากมหาวิทยาลัยโคโลราโด โบลเดอร์

คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK PlatoBlockchain Data Intelligence ค้นหาแนวตั้ง AI.รูพินเดอร์ กรีวาล เป็น Sr Ai/ML Specialist Solutions Architect กับ AWS ปัจจุบันเขามุ่งเน้นการให้บริการโมเดลและ MLOps บน SageMaker ก่อนหน้าจะรับตำแหน่งนี้ เขาเคยทำงานเป็นวิศวกรด้านการเรียนรู้ของเครื่องจักรในการสร้างและโฮสต์โมเดล นอกเวลางาน เขาชอบเล่นเทนนิสและขี่จักรยานบนเส้นทางบนภูเขา

คุณสมบัติใหม่สำหรับ Amazon SageMaker Pipelines และ Amazon SageMaker SDK PlatoBlockchain Data Intelligence ค้นหาแนวตั้ง AI.เรย์ ลี่ เป็น Sr. Data Scientist ที่มี AWS Professional Services ความเชี่ยวชาญพิเศษของเขามุ่งเน้นไปที่การสร้างและดำเนินการโซลูชัน AI/ML สำหรับลูกค้าที่มีขนาดแตกต่างกัน ตั้งแต่สตาร์ทอัพไปจนถึงองค์กรระดับองค์กร นอกเวลางาน เรย์ชอบออกกำลังกายและเดินทาง

ประทับเวลา:

เพิ่มเติมจาก AWS Machine Learning AWS