Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.

Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK

Αγωγοί Amazon SageMaker επιτρέπει σε επιστήμονες δεδομένων και μηχανικούς μηχανικής μάθησης (ML) να αυτοματοποιούν τις ροές εργασίας εκπαίδευσης, κάτι που σας βοηθά να δημιουργήσετε μια επαναλαμβανόμενη διαδικασία για να ενορχηστρώσετε τα βήματα ανάπτυξης μοντέλων για γρήγορο πειραματισμό και επανεκπαίδευση μοντέλων. Μπορείτε να αυτοματοποιήσετε ολόκληρη τη ροή εργασιών κατασκευής μοντέλου, συμπεριλαμβανομένης της προετοιμασίας δεδομένων, της μηχανικής δυνατοτήτων, της εκπαίδευσης μοντέλων, του συντονισμού μοντέλων και της επικύρωσης μοντέλων, και την καταχώρισή του στο μητρώο μοντέλων. Μπορείτε να ρυθμίσετε τις παραμέτρους των αγωγών ώστε να εκτελούνται αυτόματα σε τακτά χρονικά διαστήματα ή όταν ενεργοποιούνται ορισμένα συμβάντα ή μπορείτε να τις εκτελέσετε χειροκίνητα όπως απαιτείται.

Σε αυτήν την ανάρτηση, επισημαίνουμε ορισμένες από τις βελτιώσεις στο Amazon Sage Maker SDK και εισαγάγετε νέες δυνατότητες του Amazon SageMaker Pipelines που διευκολύνουν τους επαγγελματίες ML να δημιουργήσουν και να εκπαιδεύσουν μοντέλα ML.

Η Pipelines συνεχίζει να καινοτομεί την εμπειρία προγραμματιστή της και με αυτές τις πρόσφατες εκδόσεις, μπορείτε πλέον να χρησιμοποιείτε την υπηρεσία με πιο προσαρμοσμένο τρόπο:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Ενημερωμένη τεκμηρίωση για PipelineVariable χρήση για εκτιμητές, επεξεργαστές, δέκτη, μετασχηματιστές και βασικές κατηγορίες μοντέλων, μοντέλα Amazon και μοντέλα πλαισίου. Θα υπάρξουν πρόσθετες αλλαγές με τις νεότερες εκδόσεις του SDK για την υποστήριξη όλων των υποκατηγοριών εκτιμητών και επεξεργαστών.
  • 2.90.0 - Διαθεσιμότητα του ModelStep για εργασίες δημιουργίας και καταχώρησης ολοκληρωμένων μοντέλων πόρων.
  • 2.88.2 - Διαθεσιμότητα του PipelineSession για διαχειριζόμενη αλληλεπίδραση με οντότητες και πόρους του SageMaker.
  • 2.88.2 – Συμβατότητα υποκατηγορίας για βήματα εργασίας αγωγού ροής εργασίας ώστε να μπορείτε να δημιουργήσετε αφαιρέσεις εργασιών και να διαμορφώσετε και να εκτελέσετε εργασίες επεξεργασίας, εκπαίδευσης, μετασχηματισμού και συντονισμού όπως θα κάνατε χωρίς διοχέτευση.
  • 2.76.0 - Διαθεσιμότητα του FailStep να σταματήσει υπό όρους έναν αγωγό με κατάσταση αστοχίας.

Σε αυτήν την ανάρτηση, σας καθοδηγούμε σε μια ροή εργασιών χρησιμοποιώντας ένα δείγμα δεδομένων με εστίαση στη δημιουργία και την ανάπτυξη μοντέλων για να δείξουμε πώς μπορείτε να εφαρμόσετε τις νέες δυνατότητες των Pipelines. Στο τέλος, θα πρέπει να έχετε αρκετές πληροφορίες για να χρησιμοποιήσετε με επιτυχία αυτές τις νεότερες δυνατότητες και να απλοποιήσετε τους φόρτους εργασίας ML.

Επισκόπηση λειτουργιών

Το Pipelines προσφέρει τα ακόλουθα νέα χαρακτηριστικά:

  • Σχολιασμός μεταβλητής αγωγού – Ορισμένες παράμετροι μεθόδου δέχονται πολλαπλούς τύπους εισόδου, συμπεριλαμβανομένων PipelineVariables, και έχει προστεθεί πρόσθετη τεκμηρίωση για να διευκρινιστεί πού PipelineVariables υποστηρίζονται τόσο στην πιο πρόσφατη σταθερή έκδοση της τεκμηρίωσης του SageMaker SDK όσο και στην αρχική υπογραφή των λειτουργιών. Για παράδειγμα, στον ακόλουθο εκτιμητή TensorFlow, η υπογραφή init το δείχνει τώρα αυτό model_dir και image_uri υποστήριξη PipelineVariables, ενώ οι άλλες παράμετροι όχι. Για περισσότερες πληροφορίες, ανατρέξτε στο Εκτιμητής TensorFlow.
    • Πριν:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Μετά:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Συνεδρία αγωγού - PipelineSession είναι μια νέα ιδέα που εισήχθη για να φέρει ενότητα στο SageMaker SDK και εισάγει την αργή προετοιμασία των πόρων του αγωγού (οι κλήσεις εκτέλεσης καταγράφονται αλλά δεν εκτελούνται έως ότου δημιουργηθεί και εκτελεστεί η διοχέτευση). ο PipelineSession πλαίσιο κληρονομεί το SageMakerSession και εφαρμόζει βολικές μεθόδους για να αλληλεπιδράσετε με άλλες οντότητες και πόρους του SageMaker, όπως εργασίες εκπαίδευσης, τελικά σημεία και σύνολα δεδομένων εισόδου που είναι αποθηκευμένα σε Απλή υπηρεσία αποθήκευσης Amazon (Amazon S3).
  • Συμβατότητα υποκατηγορίας με βήματα εργασίας αγωγού ροής εργασίας – Τώρα μπορείτε να δημιουργήσετε αφαιρέσεις εργασιών και να διαμορφώσετε και να εκτελέσετε εργασίες επεξεργασίας, εκπαίδευσης, μετασχηματισμού και συντονισμού όπως θα κάνατε χωρίς διοχέτευση.
    • Για παράδειγμα, η δημιουργία ενός βήματος επεξεργασίας με SKLearnProcessor προηγουμένως απαιτούνταν τα ακόλουθα:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Όπως βλέπουμε στον προηγούμενο κώδικα, ProcessingStep πρέπει να κάνει βασικά την ίδια λογική προεπεξεργασίας όπως .run, απλά χωρίς να ξεκινήσει η κλήση API για να ξεκινήσει η εργασία. Αλλά με τη συμβατότητα υποκατηγορίας που είναι πλέον ενεργοποιημένη με τα βήματα εργασίας αγωγών ροής εργασίας, δηλώνουμε το step_args όρισμα που παίρνει τη λογική προεπεξεργασίας με .run ώστε να μπορείτε να δημιουργήσετε μια αφαίρεση εργασίας και να τη διαμορφώσετε όπως θα τη χρησιμοποιούσατε χωρίς Pipelines. Περνάμε και στο pipeline_session, Η οποία είναι PipelineSession αντικείμενο, αντί για sagemaker_session για να βεβαιωθείτε ότι οι κλήσεις εκτέλεσης έχουν καταγραφεί αλλά δεν καλούνται έως ότου δημιουργηθεί και εκτελεστεί η διοχέτευση. Δείτε τον παρακάτω κώδικα:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Βήμα μοντέλου (μια βελτιωμένη προσέγγιση με βήματα δημιουργίας και εγγραφής μοντέλου) –Το Pipelines προσφέρει δύο τύπους βημάτων για ενσωμάτωση με τα μοντέλα SageMaker: CreateModelStep και RegisterModel. Τώρα μπορείτε να επιτύχετε και τα δύο χρησιμοποιώντας μόνο το ModelStep τύπος. Σημειώστε ότι α PipelineSession απαιτείται για να επιτευχθεί αυτό. Αυτό φέρνει ομοιότητα μεταξύ των βημάτων του αγωγού και του SDK.
    • Πριν:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Μετά:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Βήμα αποτυχίας (υπό όρους διακοπή της λειτουργίας του αγωγού) - FailStep επιτρέπει τη διακοπή ενός αγωγού με κατάσταση αστοχίας εάν πληρούται μια προϋπόθεση, όπως εάν η βαθμολογία του μοντέλου είναι κάτω από ένα συγκεκριμένο όριο.

Επισκόπηση λύσεων

Σε αυτή τη λύση, το σημείο εισόδου σας είναι το Στούντιο Amazon SageMaker ολοκληρωμένο περιβάλλον ανάπτυξης (IDE) για γρήγορο πειραματισμό. Το Studio προσφέρει ένα περιβάλλον για τη διαχείριση της εμπειρίας των Pipelines από άκρο σε άκρο. Με το Studio, μπορείτε να παρακάμψετε το Κονσόλα διαχείρισης AWS για ολόκληρη τη διαχείριση της ροής εργασίας σας. Για περισσότερες πληροφορίες σχετικά με τη διαχείριση Pipelines από το Studio, ανατρέξτε στο Προβολή, παρακολούθηση και εκτέλεση αγωγών SageMaker στο SageMaker Studio.

Το παρακάτω διάγραμμα απεικονίζει την αρχιτεκτονική υψηλού επιπέδου της ροής εργασίας ML με τα διαφορετικά βήματα για την εκπαίδευση και τη δημιουργία συμπερασμάτων χρησιμοποιώντας τις νέες δυνατότητες.

Ο αγωγός περιλαμβάνει τα ακόλουθα βήματα:

  1. Προεπεξεργασία δεδομένων για τη δημιουργία απαιτούμενων χαρακτηριστικών και διαχωρισμό των δεδομένων σε σύνολα δεδομένων εκπαίδευσης, επικύρωσης και δοκιμής.
  2. Δημιουργήστε μια εργασία εκπαίδευσης με το πλαίσιο SageMaker XGBoost.
  3. Αξιολογήστε το εκπαιδευμένο μοντέλο χρησιμοποιώντας το σύνολο δεδομένων δοκιμής.
  4. Ελέγξτε εάν η βαθμολογία AUC είναι πάνω από ένα προκαθορισμένο όριο.
    • Εάν η βαθμολογία AUC είναι μικρότερη από το όριο, σταματήστε τη λειτουργία του αγωγού και επισημάνετε την ως αποτυχημένη.
    • Εάν η βαθμολογία AUC είναι μεγαλύτερη από το όριο, δημιουργήστε ένα μοντέλο SageMaker και καταχωρίστε το στο μητρώο μοντέλου SageMaker.
  5. Εφαρμόστε μετασχηματισμό παρτίδας στο δεδομένο σύνολο δεδομένων χρησιμοποιώντας το μοντέλο που δημιουργήθηκε στο προηγούμενο βήμα.

Προϋποθέσεις

Για να ακολουθήσετε αυτήν την ανάρτηση, χρειάζεστε έναν λογαριασμό AWS με α Τομέας Studio.

Το Pipelines είναι ενσωματωμένο απευθείας με οντότητες και πόρους του SageMaker, επομένως δεν χρειάζεται να αλληλεπιδράτε με άλλες υπηρεσίες AWS. Επίσης, δεν χρειάζεται να διαχειρίζεστε πόρους, επειδή είναι μια πλήρως διαχειριζόμενη υπηρεσία, πράγμα που σημαίνει ότι δημιουργεί και διαχειρίζεται πόρους για εσάς. Για περισσότερες πληροφορίες σχετικά με τα διάφορα στοιχεία του SageMaker που είναι και τα δύο ανεξάρτητα Python API μαζί με ενσωματωμένα στοιχεία του Studio, ανατρέξτε στο Σελίδα προϊόντος SageMaker.

Πριν ξεκινήσετε, εγκαταστήστε το SageMaker SDK έκδοση >= 2.104.0 και xlrd >=1.0.0 στο σημειωματάριο Studio χρησιμοποιώντας το ακόλουθο απόσπασμα κώδικα:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Ροή εργασιών ML

Για αυτήν την ανάρτηση, χρησιμοποιείτε τα ακόλουθα στοιχεία:

  • Προετοιμασία δεδομένων
    • Επεξεργασία SageMaker – Το SageMaker Processing είναι μια πλήρως διαχειριζόμενη υπηρεσία που σας επιτρέπει να εκτελείτε προσαρμοσμένους μετασχηματισμούς δεδομένων και μηχανική λειτουργιών για φόρτους εργασίας ML.
  • Μοντέλο κτίριο
  • Πρότυπη εκπαίδευση και αξιολόγηση
    • Εκπαίδευση με ένα κλικ – Η δυνατότητα κατανεμημένης εκπαίδευσης SageMaker. Το SageMaker παρέχει κατανεμημένες βιβλιοθήκες εκπαίδευσης για παραλληλισμό δεδομένων και παραλληλισμό μοντέλων. Οι βιβλιοθήκες είναι βελτιστοποιημένες για το εκπαιδευτικό περιβάλλον SageMaker, βοηθούν στην προσαρμογή των κατανεμημένων εργασιών εκπαίδευσης στο SageMaker και βελτιώνουν την ταχύτητα και την απόδοση της εκπαίδευσης.
    • Πειράματα SageMaker – Το Experiments είναι μια δυνατότητα του SageMaker που σας επιτρέπει να οργανώνετε, να παρακολουθείτε, να συγκρίνετε και να αξιολογείτε τις επαναλήψεις σας ML.
    • Μετασχηματισμός παρτίδας SageMaker – Ο μετασχηματισμός παρτίδας ή η βαθμολογία εκτός σύνδεσης είναι μια διαχειριζόμενη υπηρεσία στο SageMaker που σας επιτρέπει να προβλέπετε ένα μεγαλύτερο σύνολο δεδομένων χρησιμοποιώντας τα μοντέλα ML σας.
  • Ενορχήστρωση ροής εργασιών

Ένας αγωγός SageMaker είναι μια σειρά διασυνδεδεμένων βημάτων που ορίζονται από έναν ορισμό αγωγού JSON. Κωδικοποιεί έναν αγωγό χρησιμοποιώντας ένα κατευθυνόμενο ακυκλικό γράφημα (DAG). Το DAG παρέχει πληροφορίες σχετικά με τις απαιτήσεις και τις σχέσεις μεταξύ κάθε βήματος του αγωγού και η δομή του καθορίζεται από τις εξαρτήσεις δεδομένων μεταξύ των βημάτων. Αυτές οι εξαρτήσεις δημιουργούνται όταν οι ιδιότητες της εξόδου ενός βήματος μεταβιβάζονται ως είσοδος σε ένα άλλο βήμα.

Το παρακάτω διάγραμμα απεικονίζει τα διαφορετικά βήματα στη διοχέτευση του SageMaker (για μια περίπτωση χρήσης πρόβλεψης ανατροπής) όπου οι συνδέσεις μεταξύ των βημάτων συνάγονται από το SageMaker με βάση τις εισόδους και τις εξόδους που ορίζονται από τους ορισμούς βημάτων.

Τα επόμενα τμήματα διασχίζουν τη δημιουργία κάθε βήματος του αγωγού και τη λειτουργία ολόκληρου του αγωγού μόλις δημιουργηθεί.

Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.

Δομή έργου

Ας ξεκινήσουμε με τη δομή του έργου:

  • /sm-pipelines-end-to-end-example – Το όνομα του έργου
    • /δεδομένα – Τα σύνολα δεδομένων
    • /σωλήνες – Τα αρχεία κώδικα για τα στοιχεία του αγωγού
      • /customerchurn
        • προεπεξεργασία.py
        • αξιολογήστε.py
    • sagemaker-pipelines-project.ipynb – Ένα σημειωματάριο που περπατά στη ροή εργασιών μοντελοποίησης χρησιμοποιώντας τις νέες δυνατότητες της Pipelines

Κατεβάστε το σύνολο δεδομένων

Για να ακολουθήσετε αυτήν την ανάρτηση, πρέπει να κατεβάσετε και να αποθηκεύσετε το δείγμα συνόλου δεδομένων κάτω από το φάκελο δεδομένων στον κεντρικό κατάλογο του έργου, ο οποίος αποθηκεύει το αρχείο Σύστημα αρχείων ελαστικού Amazon (Amazon EFS) στο περιβάλλον Studio.

Κατασκευάστε τα εξαρτήματα του αγωγού

Τώρα είστε έτοιμοι να δημιουργήσετε τα στοιχεία του αγωγού.

Εισαγωγή δηλώσεων και δήλωση παραμέτρων και σταθερών

Δημιουργήστε ένα σημειωματάριο Studio που ονομάζεται sagemaker-pipelines-project.ipynb στον αρχικό κατάλογο του έργου. Εισαγάγετε το ακόλουθο μπλοκ κώδικα σε ένα κελί και εκτελέστε το κελί για να ρυθμίσετε τα αντικείμενα πελάτη SageMaker και S3, δημιουργήστε PipelineSessionκαι ρυθμίστε τη θέση του κάδου S3 χρησιμοποιώντας τον προεπιλεγμένο κάδο που συνοδεύει μια περίοδο λειτουργίας SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Το Pipelines υποστηρίζει την παραμετροποίηση, η οποία σας επιτρέπει να καθορίσετε παραμέτρους εισόδου κατά το χρόνο εκτέλεσης χωρίς να αλλάξετε τον κώδικα του αγωγού σας. Μπορείτε να χρησιμοποιήσετε τις ενότητες που είναι διαθέσιμες κάτω από το sagemaker.workflow.parameters ενότητα, όπως ParameterInteger, ParameterFloat, να ParameterString, για να καθορίσετε παραμέτρους αγωγών διαφόρων τύπων δεδομένων. Εκτελέστε τον ακόλουθο κώδικα για να ρυθμίσετε πολλές παραμέτρους εισαγωγής:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Δημιουργήστε ένα σύνολο δεδομένων παρτίδας

Δημιουργήστε το σύνολο δεδομένων παρτίδας, το οποίο θα χρησιμοποιήσετε αργότερα στο βήμα μετασχηματισμού παρτίδας:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Μεταφορτώστε δεδομένα σε έναν κάδο S3

Ανεβάστε τα σύνολα δεδομένων στο Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Καθορίστε ένα σενάριο επεξεργασίας και ένα βήμα επεξεργασίας

Σε αυτό το βήμα, προετοιμάζετε ένα σενάριο Python για να κάνετε μηχανική χαρακτηριστικών, μια γρήγορη κωδικοποίηση και να επιμελείτε τα τμήματα εκπαίδευσης, επικύρωσης και δοκιμής που θα χρησιμοποιηθούν για τη δημιουργία μοντέλων. Εκτελέστε τον ακόλουθο κώδικα για να δημιουργήσετε το σενάριο επεξεργασίας σας:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Στη συνέχεια, εκτελέστε το ακόλουθο μπλοκ κώδικα για να δημιουργήσετε τον επεξεργαστή και το βήμα Pipelines για να εκτελέσετε το σενάριο επεξεργασίας. Επειδή το σενάριο επεξεργασίας είναι γραμμένο σε Pandas, χρησιμοποιείτε a SKLearnProcessor. Οι Αγωγοί ProcessingStep Η συνάρτηση λαμβάνει τα ακόλουθα ορίσματα: τον επεξεργαστή, τις θέσεις εισόδου S3 για μη επεξεργασμένα σύνολα δεδομένων και τις θέσεις S3 εξόδου για την αποθήκευση επεξεργασμένων συνόλων δεδομένων.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Καθορίστε ένα βήμα προπόνησης

Ρυθμίστε το μοντέλο εκπαίδευσης χρησιμοποιώντας έναν εκτιμητή SageMaker XGBoost και τις Σωληνώσεις TrainingStep λειτουργία:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Καθορίστε το βήμα αξιολόγησης του σεναρίου και του μοντέλου αξιολόγησης

Εκτελέστε το ακόλουθο μπλοκ κώδικα για να αξιολογήσετε το μοντέλο μετά την εκπαίδευση. Αυτό το σενάριο ενσωματώνει τη λογική για να ελέγξετε εάν η βαθμολογία AUC πληροί το καθορισμένο όριο.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Στη συνέχεια, εκτελέστε το ακόλουθο μπλοκ κώδικα για να δημιουργήσετε τον επεξεργαστή και το βήμα Pipelines για να εκτελέσετε το σενάριο αξιολόγησης. Επειδή το σενάριο αξιολόγησης χρησιμοποιεί το πακέτο XGBoost, χρησιμοποιείτε a ScriptProcessor μαζί με την εικόνα XGBoost. Οι Αγωγοί ProcessingStep Η συνάρτηση λαμβάνει τα ακόλουθα ορίσματα: τον επεξεργαστή, τις θέσεις εισόδου S3 για μη επεξεργασμένα σύνολα δεδομένων και τις θέσεις S3 εξόδου για την αποθήκευση επεξεργασμένων συνόλων δεδομένων.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Καθορίστε ένα βήμα δημιουργίας μοντέλου

Εκτελέστε το ακόλουθο μπλοκ κώδικα για να δημιουργήσετε ένα μοντέλο SageMaker χρησιμοποιώντας το βήμα μοντέλου Pipelines. Αυτό το βήμα χρησιμοποιεί την έξοδο του βήματος εκπαίδευσης για να συσκευάσει το μοντέλο για ανάπτυξη. Σημειώστε ότι η τιμή για το όρισμα τύπου instance μεταβιβάζεται χρησιμοποιώντας την παράμετρο Pipelines που καθορίσατε νωρίτερα στην ανάρτηση.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Ορίστε ένα βήμα μετασχηματισμού παρτίδας

Εκτελέστε το ακόλουθο μπλοκ κώδικα για να εκτελέσετε μετασχηματισμό παρτίδας χρησιμοποιώντας το εκπαιδευμένο μοντέλο με την είσοδο δέσμης που δημιουργήθηκε στο πρώτο βήμα:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Καθορίστε ένα βήμα μοντέλου εγγραφής

Ο ακόλουθος κώδικας καταχωρεί το μοντέλο στο μητρώο μοντέλου SageMaker χρησιμοποιώντας το βήμα μοντέλου Pipelines:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Καθορίστε ένα βήμα αποτυχίας για να σταματήσετε τον αγωγό

Ο ακόλουθος κώδικας ορίζει το βήμα αποτυχίας των αγωγών για να σταματήσει η εκτέλεση του αγωγού με ένα μήνυμα σφάλματος εάν η βαθμολογία AUC δεν πληροί το καθορισμένο όριο:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Καθορίστε ένα βήμα συνθήκης για να ελέγξετε τη βαθμολογία AUC

Ο παρακάτω κώδικας ορίζει ένα βήμα συνθήκης για να ελέγξετε τη βαθμολογία AUC και να δημιουργήσετε υπό όρους ένα μοντέλο και να εκτελέσετε έναν μετασχηματισμό παρτίδας και να καταχωρίσετε ένα μοντέλο στο μητρώο μοντέλων ή να σταματήσετε την εκτέλεση του αγωγού σε αποτυχημένη κατάσταση:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Κατασκευάστε και εκτελέστε τον αγωγό

Αφού ορίσετε όλα τα βήματα του στοιχείου, μπορείτε να τα συναρμολογήσετε σε ένα αντικείμενο Pipelines. Δεν χρειάζεται να καθορίσετε τη σειρά διοχέτευσης, επειδή οι Pipelines συνάγουν αυτόματα τη σειρά παραγγελιών με βάση τις εξαρτήσεις μεταξύ των βημάτων.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Εκτελέστε τον ακόλουθο κώδικα σε ένα κελί στο σημειωματάριό σας. Εάν ο αγωγός υπάρχει ήδη, ο κώδικας ενημερώνει τον αγωγό. Εάν ο αγωγός δεν υπάρχει, δημιουργεί έναν νέο.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Συμπέρασμα

Σε αυτήν την ανάρτηση, παρουσιάσαμε ορισμένες από τις νέες δυνατότητες που είναι τώρα διαθέσιμες με το Pipelines μαζί με άλλες ενσωματωμένες δυνατότητες του SageMaker και τον αλγόριθμο XGBoost για την ανάπτυξη, την επανάληψη και την ανάπτυξη ενός μοντέλου για την πρόβλεψη εκτροπής. Η λύση μπορεί να επεκταθεί με πρόσθετες πηγές δεδομένων

για να εφαρμόσετε τη δική σας ροή εργασίας ML. Για περισσότερες λεπτομέρειες σχετικά με τα βήματα που είναι διαθέσιμα στη ροή εργασίας Pipelines, ανατρέξτε στο Αγωγός κατασκευής μοντέλου Amazon SageMaker και Ροές εργασιών SageMaker. ο Παραδείγματα AWS SageMaker Το repo του GitHub έχει περισσότερα παραδείγματα σχετικά με διάφορες περιπτώσεις χρήσης που χρησιμοποιούν Pipelines.


Σχετικά με τους Συγγραφείς

Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.Τζέρι Πενγκ είναι μηχανικός ανάπτυξης λογισμικού με το AWS SageMaker. Επικεντρώνεται στην κατασκευή συστημάτων MLOps μεγάλης κλίμακας από άκρο σε άκρο από την εκπαίδευση έως την παρακολούθηση μοντέλων στην παραγωγή. Είναι επίσης παθιασμένος να φέρει την έννοια των MLOps στο ευρύτερο κοινό.

Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.Dewen Qi είναι Μηχανικός Ανάπτυξης Λογισμικού στην AWS. Επί του παρόντος εστιάζει στην ανάπτυξη και τη βελτίωση των αγωγών SageMaker. Εκτός δουλειάς, της αρέσει να ασκεί βιολοντσέλο.

Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.Gayatri Ghanakota είναι Sr. Machine Learning Engineer με AWS Professional Services. Είναι παθιασμένη με την ανάπτυξη, την ανάπτυξη και την εξήγηση λύσεων AI/ML σε διάφορους τομείς. Πριν από αυτόν τον ρόλο, ηγήθηκε πολλαπλών πρωτοβουλιών ως επιστήμονας δεδομένων και μηχανικός ML με κορυφαίες παγκόσμιες εταιρείες στον χρηματοοικονομικό και λιανικό χώρο. Είναι κάτοχος μεταπτυχιακού τίτλου στην Επιστήμη Υπολογιστών με εξειδίκευση στην Επιστήμη των Δεδομένων από το Πανεπιστήμιο του Κολοράντο, Boulder.

Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.Ρούπιντερ Γκρέουαλ είναι Sr Ai/ML Specialist Solutions Architect με AWS. Επί του παρόντος εστιάζει στην εξυπηρέτηση μοντέλων και MLOps στο SageMaker. Πριν από αυτόν τον ρόλο έχει εργαστεί ως μηχανικός μηχανικής εκμάθησης μοντέλων κατασκευής και φιλοξενίας. Εκτός δουλειάς του αρέσει να παίζει τένις και να κάνει ποδήλατο σε ορεινά μονοπάτια.

Νέες δυνατότητες για το Amazon SageMaker Pipelines και το Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Κάθετη αναζήτηση. Ολα συμπεριλαμβάνονται.Ρέι Λι είναι Sr. Data Scientist με AWS Professional Services. Η ειδικότητά του επικεντρώνεται στη δημιουργία και τη λειτουργικότητα λύσεων AI/ML για πελάτες διαφορετικών μεγεθών, που κυμαίνονται από νεοφυείς επιχειρήσεις έως επιχειρηματικούς οργανισμούς. Εκτός δουλειάς, ο Ρέι απολαμβάνει τη φυσική κατάσταση και τα ταξίδια.

Σφραγίδα ώρας:

Περισσότερα από Μηχανική εκμάθηση AWS