अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके प्लेटोब्लॉकचैन डेटा इंटेलिजेंस के लिए नई सुविधाएँ। लंबवत खोज। ऐ.

अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके के लिए नई सुविधाएँ

अमेज़न SageMaker पाइपलाइन डेटा वैज्ञानिकों और मशीन लर्निंग (एमएल) इंजीनियरों को प्रशिक्षण वर्कफ़्लो को स्वचालित करने की अनुमति देता है, जो आपको तेजी से प्रयोग और मॉडल पुनर्प्रशिक्षण के लिए मॉडल विकास चरणों को व्यवस्थित करने के लिए एक दोहराने योग्य प्रक्रिया बनाने में मदद करता है। आप डेटा तैयार करने, फीचर इंजीनियरिंग, मॉडल प्रशिक्षण, मॉडल ट्यूनिंग और मॉडल सत्यापन सहित संपूर्ण मॉडल बिल्ड वर्कफ़्लो को स्वचालित कर सकते हैं और इसे मॉडल रजिस्ट्री में सूचीबद्ध कर सकते हैं। आप पाइपलाइनों को नियमित अंतराल पर स्वचालित रूप से चलाने के लिए कॉन्फ़िगर कर सकते हैं या जब कुछ ईवेंट ट्रिगर होते हैं, या आप उन्हें आवश्यकतानुसार मैन्युअल रूप से चला सकते हैं।

इस पोस्ट में, हम कुछ संवर्द्धन पर प्रकाश डालते हैं अमेज़न SageMaker एसडीके और अमेज़ॅन सेजमेकर पाइपलाइनों की नई विशेषताएं पेश करें जो एमएल प्रैक्टिशनरों के लिए एमएल मॉडल बनाने और प्रशिक्षित करने में आसान बनाती हैं।

पाइपलाइन अपने डेवलपर अनुभव को नया करना जारी रखती है, और इन हालिया रिलीज के साथ, अब आप सेवा का अधिक अनुकूलित तरीके से उपयोग कर सकते हैं:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 - अद्यतन दस्तावेज़ीकरण PipelineVariable अनुमानक, प्रोसेसर, ट्यूनर, ट्रांसफॉर्मर, और मॉडल बेस क्लास, अमेज़ॅन मॉडल और फ्रेमवर्क मॉडल के लिए उपयोग। अनुमानकों और प्रोसेसर के सभी उपवर्गों का समर्थन करने के लिए एसडीके के नए संस्करणों के साथ अतिरिक्त बदलाव आने वाले हैं।
  • 2.90.0 - की उपलब्धता मॉडलस्टेप एकीकृत मॉडल संसाधन निर्माण और पंजीकरण कार्यों के लिए।
  • 2.88.2 - की उपलब्धता पाइपलाइन सत्र सेजमेकर संस्थाओं और संसाधनों के साथ प्रबंधित बातचीत के लिए।
  • 2.88.2 - के लिए उपवर्ग अनुकूलता कार्यप्रवाह पाइपलाइन कार्य चरण ताकि आप जॉब एब्स्ट्रैक्शन बना सकें और प्रोसेसिंग, ट्रेनिंग, ट्रांसफॉर्म और ट्यूनिंग जॉब को कॉन्फ़िगर और चला सकें, जैसा कि आप बिना पाइपलाइन के करेंगे।
  • 2.76.0 - की उपलब्धता फ़ेलस्टेप एक विफलता स्थिति के साथ एक पाइपलाइन को सशर्त रूप से रोकने के लिए।

इस पोस्ट में, हम आपको नमूना डेटासेट का उपयोग करके वर्कफ़्लो के माध्यम से चलते हैं, जिसमें मॉडल निर्माण और परिनियोजन पर ध्यान केंद्रित किया जाता है ताकि यह प्रदर्शित किया जा सके कि पाइपलाइनों की नई सुविधाओं को कैसे लागू किया जाए। अंत तक, आपके पास इन नई सुविधाओं का सफलतापूर्वक उपयोग करने और अपने एमएल वर्कलोड को सरल बनाने के लिए पर्याप्त जानकारी होनी चाहिए।

सुविधाएँ अवलोकन

पाइपलाइन निम्नलिखित नई सुविधाएँ प्रदान करती है:

  • पाइपलाइन चर एनोटेशन - कुछ विधि पैरामीटर कई इनपुट प्रकारों को स्वीकार करते हैं, जिनमें शामिल हैं PipelineVariables, और अतिरिक्त दस्तावेज़ीकरण जोड़ा गया है ताकि स्पष्ट किया जा सके कि कहाँ PipelineVariables सेजमेकर एसडीके प्रलेखन के नवीनतम स्थिर संस्करण और कार्यों के इनिट हस्ताक्षर दोनों में समर्थित हैं। उदाहरण के लिए, निम्नलिखित TensorFlow अनुमानक में, init हस्ताक्षर अब दिखाता है कि model_dir और image_uri समर्थन PipelineVariables, जबकि अन्य पैरामीटर नहीं हैं। अधिक जानकारी के लिए देखें टेंसरफ्लो अनुमानक.
    • पहले:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • बाद:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • पाइपलाइन सत्र - पाइपलाइन सत्र सेजमेकर एसडीके में एकता लाने के लिए शुरू की गई एक नई अवधारणा है और पाइपलाइन संसाधनों के आलसी आरंभीकरण का परिचय देती है (रन कॉल कैप्चर की जाती हैं लेकिन पाइपलाइन बनने और चलने तक नहीं चलती हैं)। PipelineSession प्रसंग इनहेरिट करता है SageMakerSession और आपके लिए अन्य सेजमेकर संस्थाओं और संसाधनों के साथ बातचीत करने के लिए सुविधाजनक तरीके लागू करता है, जैसे प्रशिक्षण कार्य, समापन बिंदु, और में संग्रहीत इनपुट डेटासेट अमेज़न सरल भंडारण सेवा (अमेज़न S3)।
  • वर्कफ़्लो पाइपलाइन कार्य चरणों के साथ उपवर्ग संगतता - अब आप जॉब एब्स्ट्रैक्शन बना सकते हैं और प्रोसेसिंग, ट्रेनिंग, ट्रांसफॉर्म और ट्यूनिंग जॉब को कॉन्फ़िगर और चला सकते हैं जैसे आप बिना पाइपलाइन के करेंगे।
    • उदाहरण के लिए, के साथ एक प्रसंस्करण चरण बनाना SKLearnProcessor पहले निम्नलिखित की आवश्यकता थी:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • जैसा कि हम पिछले कोड में देखते हैं, ProcessingStep मूल रूप से वही प्रीप्रोसेसिंग तर्क करने की आवश्यकता है .run, काम शुरू करने के लिए एपीआई कॉल शुरू किए बिना। लेकिन उपवर्ग संगतता के साथ अब कार्यप्रवाह पाइपलाइन कार्य चरणों के साथ सक्षम किया गया है, हम घोषित करते हैं step_args तर्क जो प्रीप्रोसेसिंग तर्क को .run के साथ लेता है ताकि आप नौकरी अमूर्त बना सकें और इसे कॉन्फ़िगर कर सकें क्योंकि आप इसे पाइपलाइनों के बिना उपयोग करेंगे। हम भी में गुजरते हैं pipeline_sessionहै, जो एक है PipelineSession वस्तु, के बजाय sagemaker_session यह सुनिश्चित करने के लिए कि रन कॉल कैप्चर की जाती हैं लेकिन पाइपलाइन बनने और चलने तक कॉल नहीं की जाती हैं। निम्नलिखित कोड देखें:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • मॉडल चरण (मॉडल निर्माण और पंजीकरण चरणों के साथ एक सुव्यवस्थित दृष्टिकोण) -पाइपलाइन सेजमेकर मॉडल के साथ एकीकृत करने के लिए दो चरण प्रकार प्रदान करता है: CreateModelStep और RegisterModel. अब आप केवल का उपयोग करके दोनों को प्राप्त कर सकते हैं ModelStep प्रकार। ध्यान दें कि ए PipelineSession इसे हासिल करने के लिए आवश्यक है। यह पाइपलाइन चरणों और एसडीके के बीच समानता लाता है।
    • पहले:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • बाद:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • विफल चरण (पाइपलाइन चलाने की सशर्त रोक) - FailStep यदि कोई शर्त पूरी होती है तो पाइपलाइन को विफलता की स्थिति के साथ रोकने की अनुमति देता है, जैसे कि मॉडल स्कोर एक निश्चित सीमा से नीचे है।

समाधान अवलोकन

इस समाधान में, आपका प्रवेश बिंदु है अमेज़ॅन सैजमेकर स्टूडियो तेजी से प्रयोग के लिए एकीकृत विकास पर्यावरण (आईडीई)। स्टूडियो एंड-टू-एंड पाइपलाइन अनुभव को प्रबंधित करने के लिए एक वातावरण प्रदान करता है। स्टूडियो के साथ, आप इसे बायपास कर सकते हैं एडब्ल्यूएस प्रबंधन कंसोल आपके संपूर्ण वर्कफ़्लो प्रबंधन के लिए। स्टूडियो के भीतर से पाइपलाइनों के प्रबंधन के बारे में अधिक जानकारी के लिए देखें सेजमेकर स्टूडियो में सेजमेकर पाइपलाइन देखें, ट्रैक करें और निष्पादित करें.

निम्नलिखित आरेख नई सुविधाओं का उपयोग करके प्रशिक्षण और अनुमान उत्पन्न करने के लिए विभिन्न चरणों के साथ एमएल वर्कफ़्लो के उच्च-स्तरीय आर्किटेक्चर को दिखाता है।

पाइपलाइन में निम्नलिखित चरण शामिल हैं:

  1. आवश्यक सुविधाओं के निर्माण के लिए डेटा को प्रीप्रोसेस करें और डेटा को ट्रेन, सत्यापन और परीक्षण डेटासेट में विभाजित करें।
  2. SageMaker XGBoost फ्रेमवर्क के साथ एक प्रशिक्षण कार्य बनाएँ।
  3. परीक्षण डेटासेट का उपयोग करके प्रशिक्षित मॉडल का मूल्यांकन करें।
  4. जांचें कि क्या एयूसी स्कोर पूर्वनिर्धारित सीमा से ऊपर है।
    • यदि एयूसी स्कोर थ्रेशोल्ड से कम है, तो पाइपलाइन चलाना बंद करें और इसे विफल के रूप में चिह्नित करें।
    • यदि एयूसी स्कोर थ्रेशोल्ड से अधिक है, तो सेजमेकर मॉडल बनाएं और इसे सेजमेकर मॉडल रजिस्ट्री में पंजीकृत करें।
  5. पिछले चरण में बनाए गए मॉडल का उपयोग करके दिए गए डेटासेट पर बैच परिवर्तन लागू करें।

.. पूर्वापेक्षाएँ

इस पोस्ट के साथ अनुसरण करने के लिए, आपको a . के साथ एक AWS खाते की आवश्यकता है स्टूडियो डोमेन.

पाइपलाइन सीधे सेजमेकर संस्थाओं और संसाधनों के साथ एकीकृत है, इसलिए आपको किसी अन्य एडब्ल्यूएस सेवाओं के साथ बातचीत करने की आवश्यकता नहीं है। आपको किसी संसाधन का प्रबंधन करने की भी आवश्यकता नहीं है क्योंकि यह पूरी तरह से प्रबंधित सेवा है, जिसका अर्थ है कि यह आपके लिए संसाधनों का निर्माण और प्रबंधन करती है। विभिन्न सेजमेकर घटकों के बारे में अधिक जानकारी के लिए, जो स्टूडियो के एकीकृत घटकों के साथ-साथ स्टैंडअलोन पायथन एपीआई दोनों हैं, देखें सेजमेकर उत्पाद पृष्ठ.

आरंभ करने से पहले, निम्नलिखित कोड स्निपेट का उपयोग करके स्टूडियो नोटबुक में सेजमेकर एसडीके संस्करण>= 2.104.0 और xlrd>=1.0.0 स्थापित करें:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

एमएल वर्कफ़्लो

इस पोस्ट के लिए, आप निम्नलिखित घटकों का उपयोग करते हैं:

  • डेटा तैयारी
    • SageMaker प्रसंस्करण - सेजमेकर प्रोसेसिंग एक पूरी तरह से प्रबंधित सेवा है जो आपको एमएल वर्कलोड के लिए कस्टम डेटा ट्रांसफॉर्मेशन और फीचर इंजीनियरिंग चलाने की अनुमति देती है।
  • प्रतिरूप निर्माण
  • मॉडल प्रशिक्षण और मूल्यांकन
    • एक-क्लिक प्रशिक्षण - सेजमेकर ने प्रशिक्षण सुविधा वितरित की। सेजमेकर डेटा समानता और मॉडल समानता के लिए वितरित प्रशिक्षण पुस्तकालय प्रदान करता है। पुस्तकालयों को सेजमेकर प्रशिक्षण वातावरण के लिए अनुकूलित किया गया है, आपके वितरित प्रशिक्षण कार्यों को सेजमेकर के अनुकूल बनाने में मदद करता है, और प्रशिक्षण की गति और थ्रूपुट में सुधार करता है।
    • SageMaker प्रयोग - प्रयोग सेजमेकर की एक क्षमता है जो आपको अपने एमएल पुनरावृत्तियों को व्यवस्थित, ट्रैक, तुलना और मूल्यांकन करने देती है।
    • SageMaker बैच परिवर्तन - बैच ट्रांसफॉर्म या ऑफलाइन स्कोरिंग सेजमेकर में एक प्रबंधित सेवा है जो आपको अपने एमएल मॉडल का उपयोग करके बड़े डेटासेट पर भविष्यवाणी करने देती है।
  • वर्कफ़्लो ऑर्केस्ट्रेशन

एक सेजमेकर पाइपलाइन एक JSON पाइपलाइन परिभाषा द्वारा परिभाषित इंटरकनेक्टेड चरणों की एक श्रृंखला है। यह एक निर्देशित एसाइक्लिक ग्राफ (DAG) का उपयोग करके एक पाइपलाइन को एन्कोड करता है। DAG पाइपलाइन के प्रत्येक चरण के लिए आवश्यकताओं और संबंधों के बारे में जानकारी देता है, और इसकी संरचना चरणों के बीच डेटा निर्भरता द्वारा निर्धारित की जाती है। ये निर्भरताएँ तब बनती हैं जब किसी चरण के आउटपुट के गुण दूसरे चरण के इनपुट के रूप में पास किए जाते हैं।

निम्नलिखित आरेख सेजमेकर पाइपलाइन (मंथन भविष्यवाणी उपयोग के मामले के लिए) में विभिन्न चरणों को दिखाता है, जहां चरणों के बीच के कनेक्शन को सेजमेकर द्वारा चरण परिभाषाओं द्वारा परिभाषित इनपुट और आउटपुट के आधार पर अनुमानित किया जाता है।

अगले खंड पाइपलाइन के प्रत्येक चरण को बनाने और एक बार बनाई गई पूरी पाइपलाइन को चलाने के माध्यम से चलते हैं।

अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके प्लेटोब्लॉकचैन डेटा इंटेलिजेंस के लिए नई सुविधाएँ। लंबवत खोज। ऐ.

परियोजना की संरचना

आइए परियोजना संरचना से शुरू करें:

  • /एसएम-पाइपलाइन्स-एंड-टू-एंड-उदाहरण - परियोजना का नाम
    • /डेटा - डेटासेट
    • /पाइपलाइन - पाइपलाइन घटकों के लिए कोड फ़ाइलें
      • /ग्राहक मंथन
        • प्रीप्रोसेस.py
        • मूल्यांकन.py
    • सेजमेकर-पाइपलाइन-प्रोजेक्ट.ipynb - पाइपलाइन की नई सुविधाओं का उपयोग करके मॉडलिंग वर्कफ़्लो के माध्यम से चलने वाली एक नोटबुक

डेटासेट डाउनलोड करें

इस पोस्ट का अनुसरण करने के लिए, आपको डाउनलोड करने और सहेजने की आवश्यकता है नमूना डेटासेट प्रोजेक्ट होम निर्देशिका के भीतर डेटा फ़ोल्डर के अंतर्गत, जो फ़ाइल को सहेजता है अमेज़ॅन इलास्टिक फ़ाइल सिस्टम (अमेज़ॅन ईएफएस) स्टूडियो पर्यावरण के भीतर।

पाइपलाइन घटकों का निर्माण

अब आप पाइपलाइन घटकों के निर्माण के लिए तैयार हैं।

बयान आयात करें और पैरामीटर और स्थिरांक घोषित करें

नामक एक स्टूडियो नोटबुक बनाएं sagemaker-pipelines-project.ipynb प्रोजेक्ट होम निर्देशिका के भीतर। सेल में निम्नलिखित कोड ब्लॉक दर्ज करें, और सेजमेकर और एस3 क्लाइंट ऑब्जेक्ट सेट करने के लिए सेल चलाएँ, बनाएँ PipelineSession, और S3 बकेट लोकेशन को SageMaker सत्र के साथ आने वाली डिफ़ॉल्ट बकेट का उपयोग करके सेट करें:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

पाइपलाइन पैरामीटरकरण का समर्थन करती है, जो आपको अपना पाइपलाइन कोड बदले बिना रनटाइम पर इनपुट पैरामीटर निर्दिष्ट करने की अनुमति देती है। आप के तहत उपलब्ध मॉड्यूल का उपयोग कर सकते हैं sagemaker.workflow.parameters मॉड्यूल, जैसे ParameterInteger, ParameterFloat, तथा ParameterString, विभिन्न डेटा प्रकारों के पाइपलाइन पैरामीटर निर्दिष्ट करने के लिए। एकाधिक इनपुट पैरामीटर सेट करने के लिए निम्न कोड चलाएँ:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

बैच डेटासेट जनरेट करें

बैच डेटासेट जनरेट करें, जिसका उपयोग आप बाद में बैच ट्रांसफ़ॉर्म चरण में करते हैं:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

डेटा को S3 बकेट में अपलोड करें

डेटासेट को Amazon S3 पर अपलोड करें:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

प्रोसेसिंग स्क्रिप्ट और प्रोसेसिंग स्टेप को परिभाषित करें

इस चरण में, आप फीचर इंजीनियरिंग, एक हॉट एन्कोडिंग करने के लिए एक पायथन स्क्रिप्ट तैयार करते हैं, और मॉडल निर्माण के लिए उपयोग किए जाने वाले प्रशिक्षण, सत्यापन और परीक्षण विभाजन को क्यूरेट करते हैं। अपनी प्रोसेसिंग स्क्रिप्ट बनाने के लिए निम्न कोड चलाएँ:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

इसके बाद, प्रोसेसर को इंस्टेंट करने के लिए निम्न कोड ब्लॉक चलाएँ और प्रोसेसिंग स्क्रिप्ट को चलाने के लिए पाइपलाइन चरण। क्योंकि प्रसंस्करण स्क्रिप्ट पंडों में लिखी गई है, आप उपयोग करते हैं a एसकेलर्न प्रोसेसर. पाइपलाइन ProcessingStep फ़ंक्शन निम्नलिखित तर्क लेता है: प्रोसेसर, कच्चे डेटासेट के लिए इनपुट S3 स्थान, और संसाधित डेटासेट को सहेजने के लिए आउटपुट S3 स्थान।

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

एक प्रशिक्षण चरण को परिभाषित करें

SageMaker XGBoost अनुमानक और पाइपलाइनों का उपयोग करके मॉडल प्रशिक्षण सेट करें TrainingStep समारोह:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

मूल्यांकन स्क्रिप्ट और मॉडल मूल्यांकन चरण को परिभाषित करें

एक बार प्रशिक्षित मॉडल का मूल्यांकन करने के लिए निम्नलिखित कोड ब्लॉक चलाएँ। यह स्क्रिप्ट यह जांचने के लिए तर्क को समाहित करती है कि क्या AUC स्कोर निर्दिष्ट सीमा को पूरा करता है।

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

इसके बाद, मूल्यांकन स्क्रिप्ट को चलाने के लिए प्रोसेसर और पाइपलाइन चरण को इंस्टेंट करने के लिए निम्न कोड ब्लॉक चलाएँ। क्योंकि मूल्यांकन स्क्रिप्ट XGBoost पैकेज का उपयोग करती है, आप a . का उपयोग करते हैं ScriptProcessor XGBoost छवि के साथ। पाइपलाइन ProcessingStep फ़ंक्शन निम्नलिखित तर्क लेता है: प्रोसेसर, कच्चे डेटासेट के लिए इनपुट S3 स्थान, और संसाधित डेटासेट को सहेजने के लिए आउटपुट S3 स्थान।

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

मॉडल बनाने के चरण को परिभाषित करें

पाइपलाइन मॉडल चरण का उपयोग करके सेजमेकर मॉडल बनाने के लिए निम्न कोड ब्लॉक चलाएँ। यह चरण परिनियोजन के लिए मॉडल को पैकेज करने के लिए प्रशिक्षण चरण के आउटपुट का उपयोग करता है। ध्यान दें कि इंस्टेंस प्रकार तर्क के लिए मान पाइपलाइन पैरामीटर का उपयोग करके पारित किया जाता है जिसे आपने पहले पोस्ट में परिभाषित किया था।

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

बैच ट्रांसफ़ॉर्म चरण को परिभाषित करें

पहले चरण में बनाए गए बैच इनपुट के साथ प्रशिक्षित मॉडल का उपयोग करके बैच परिवर्तन को चलाने के लिए निम्नलिखित कोड ब्लॉक चलाएँ:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

रजिस्टर मॉडल चरण को परिभाषित करें

निम्न कोड पाइपलाइन मॉडल चरण का उपयोग करके मॉडल को SageMaker मॉडल रजिस्ट्री में पंजीकृत करता है:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

पाइपलाइन को रोकने के लिए एक असफल कदम को परिभाषित करें

यदि AUC स्कोर निर्धारित सीमा को पूरा नहीं करता है, तो निम्न कोड एक त्रुटि संदेश के साथ पाइपलाइन चलाने को रोकने के लिए पाइपलाइन विफल चरण को परिभाषित करता है:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

एयूसी स्कोर की जांच के लिए एक शर्त कदम परिभाषित करें

निम्नलिखित कोड एयूसी स्कोर की जांच करने के लिए एक शर्त चरण को परिभाषित करता है और सशर्त रूप से एक मॉडल बनाता है और बैच परिवर्तन चलाता है और मॉडल रजिस्ट्री में एक मॉडल पंजीकृत करता है, या असफल स्थिति में पाइपलाइन चलाने को रोकता है:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

पाइपलाइन बनाएं और चलाएं

सभी घटक चरणों को परिभाषित करने के बाद, आप उन्हें एक पाइपलाइन ऑब्जेक्ट में इकट्ठा कर सकते हैं। आपको पाइपलाइन के क्रम को निर्दिष्ट करने की आवश्यकता नहीं है क्योंकि पाइपलाइन स्वचालित रूप से चरणों के बीच निर्भरता के आधार पर ऑर्डर अनुक्रम का अनुमान लगाती है।

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

निम्नलिखित कोड को अपनी नोटबुक के सेल में चलाएँ। यदि पाइपलाइन पहले से मौजूद है, तो कोड पाइपलाइन को अपडेट करता है। यदि पाइपलाइन मौजूद नहीं है, तो यह एक नया बनाता है।

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

निष्कर्ष

इस पोस्ट में, हमने पाइपलाइन के साथ उपलब्ध कुछ नई सुविधाओं के साथ-साथ अन्य बिल्ट-इन सेजमेकर सुविधाओं और XGBoost एल्गोरिदम को मंथन भविष्यवाणी के लिए एक मॉडल को विकसित करने, पुनरावृत्त करने और तैनात करने के लिए पेश किया। समाधान को अतिरिक्त डेटा स्रोतों के साथ बढ़ाया जा सकता है

अपने स्वयं के एमएल वर्कफ़्लो को लागू करने के लिए। पाइपलाइन वर्कफ़्लो में उपलब्ध चरणों के बारे में अधिक जानकारी के लिए, देखें अमेज़ॅन सेजमेकर मॉडल बिल्डिंग पाइपलाइन और सेजमेकर वर्कफ़्लोएडब्ल्यूएस सेजमेकर उदाहरण GitHub रेपो में पाइपलाइनों का उपयोग करते हुए विभिन्न उपयोग के मामलों के अधिक उदाहरण हैं।


लेखक के बारे में

अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके प्लेटोब्लॉकचैन डेटा इंटेलिजेंस के लिए नई सुविधाएँ। लंबवत खोज। ऐ.जैरी पेंग एडब्ल्यूएस सेजमेकर के साथ एक सॉफ्टवेयर डेवलपमेंट इंजीनियर है। वह प्रशिक्षण से लेकर उत्पादन में मॉडल निगरानी तक बड़े पैमाने पर बड़े पैमाने पर एमएलओपीएस प्रणाली के निर्माण पर ध्यान केंद्रित करता है। उन्हें एमएलओपीएस की अवधारणा को व्यापक दर्शकों तक पहुंचाने का भी शौक है।

अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके प्लेटोब्लॉकचैन डेटा इंटेलिजेंस के लिए नई सुविधाएँ। लंबवत खोज। ऐ.दीवेन क्यूई एडब्ल्यूएस में सॉफ्टवेयर डेवलपमेंट इंजीनियर हैं। वह वर्तमान में सेजमेकर पाइपलाइनों के विकास और सुधार पर ध्यान केंद्रित कर रही है। काम के अलावा, उसे सेलो का अभ्यास करने में मज़ा आता है।

अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके प्लेटोब्लॉकचैन डेटा इंटेलिजेंस के लिए नई सुविधाएँ। लंबवत खोज। ऐ.गायत्री घनकोट एडब्ल्यूएस प्रोफेशनल सर्विसेज के साथ सीनियर मशीन लर्निंग इंजीनियर हैं। वह विभिन्न डोमेन में एआई / एमएल समाधानों को विकसित करने, तैनात करने और समझाने के बारे में भावुक है। इस भूमिका से पहले, उन्होंने वित्तीय और खुदरा क्षेत्र में शीर्ष वैश्विक फर्मों के साथ डेटा वैज्ञानिक और एमएल इंजीनियर के रूप में कई पहलों का नेतृत्व किया। उन्होंने यूनिवर्सिटी ऑफ कोलोराडो, बोल्डर से डेटा साइंस में विशेषीकृत कंप्यूटर साइंस में मास्टर डिग्री हासिल की है।

अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके प्लेटोब्लॉकचैन डेटा इंटेलिजेंस के लिए नई सुविधाएँ। लंबवत खोज। ऐ.रूपिंदर ग्रेवाल एडब्ल्यूएस के साथ सीनियर एआई/एमएल स्पेशलिस्ट सॉल्यूशंस आर्किटेक्ट हैं। वह वर्तमान में सेजमेकर पर मॉडल और एमएलओ की सेवा पर ध्यान केंद्रित करता है। इस भूमिका से पहले उन्होंने मशीन लर्निंग इंजीनियर बिल्डिंग और होस्टिंग मॉडल के रूप में काम किया है। काम के अलावा उन्हें टेनिस खेलना और पहाड़ की पगडंडियों पर बाइक चलाना पसंद है।

अमेज़ॅन सेजमेकर पाइपलाइनों और अमेज़ॅन सेजमेकर एसडीके प्लेटोब्लॉकचैन डेटा इंटेलिजेंस के लिए नई सुविधाएँ। लंबवत खोज। ऐ.रे लियू एडब्ल्यूएस व्यावसायिक सेवाओं के साथ एक वरिष्ठ डेटा वैज्ञानिक हैं। उनकी विशेषता स्टार्टअप से लेकर उद्यम संगठनों तक विभिन्न आकारों के ग्राहकों के लिए एआई/एमएल समाधानों के निर्माण और संचालन पर केंद्रित है। काम के बाहर, रे को फिटनेस और यात्रा करना पसंद है।

समय टिकट:

से अधिक AWS मशीन लर्निंग