Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker

Pipelines Amazon SageMaker permet aux data scientists et aux ingénieurs en apprentissage automatique (ML) d'automatiser les flux de travail de formation, ce qui vous aide à créer un processus reproductible pour orchestrer les étapes de développement de modèles pour une expérimentation et un recyclage rapides des modèles. Vous pouvez automatiser l'ensemble du flux de travail de création de modèle, y compris la préparation des données, l'ingénierie des fonctionnalités, la formation du modèle, le réglage et la validation du modèle, et le cataloguer dans le registre des modèles. Vous pouvez configurer les pipelines pour qu'ils s'exécutent automatiquement à intervalles réguliers ou lorsque certains événements sont déclenchés, ou vous pouvez les exécuter manuellement selon vos besoins.

Dans cet article, nous soulignons certaines des améliorations apportées au Amazon Sage Maker SDK et introduisent de nouvelles fonctionnalités d'Amazon SageMaker Pipelines qui permettent aux praticiens du ML de créer et de former plus facilement des modèles de ML.

Pipelines continue d'innover en matière d'expérience de développement et, avec ces versions récentes, vous pouvez désormais utiliser le service de manière plus personnalisée :

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Documentation mise à jour sur PipelineVariable utilisation pour les classes de base d'estimateur, de processeur, de tuner, de transformateur et de modèle, les modèles Amazon et les modèles de cadre. Des modifications supplémentaires seront apportées avec les nouvelles versions du SDK pour prendre en charge toutes les sous-classes d'estimateurs et de processeurs.
  • 2.90.0 – Disponibilité de ModèleÉtape pour les tâches intégrées de création et d’enregistrement de ressources de modèle.
  • 2.88.2 – Disponibilité de Session Pipeline pour une interaction gérée avec les entités et les ressources SageMaker.
  • 2.88.2 – Compatibilité des sous-classes pour étapes de la tâche du pipeline de flux de travail afin que vous puissiez créer des abstractions de tâches et configurer et exécuter des tâches de traitement, de formation, de transformation et de réglage comme vous le feriez sans pipeline.
  • 2.76.0 – Disponibilité de Étape d'échec pour arrêter conditionnellement un pipeline avec un statut d'échec.

Dans cet article, nous vous présentons un flux de travail utilisant un exemple d'ensemble de données en mettant l'accent sur la création et le déploiement de modèles pour démontrer comment implémenter les nouvelles fonctionnalités de Pipelines. À la fin, vous devriez disposer de suffisamment d’informations pour utiliser avec succès ces nouvelles fonctionnalités et simplifier vos charges de travail de ML.

Aperçu des fonctionnalités

Pipelines offre les nouvelles fonctionnalités suivantes :

  • Annotation des variables de pipeline – Certains paramètres de méthode acceptent plusieurs types d'entrée, notamment PipelineVariables, et une documentation supplémentaire a été ajoutée pour clarifier où PipelineVariables sont pris en charge à la fois dans la dernière version stable de la documentation du SDK SageMaker et dans la signature init des fonctions. Par exemple, dans l'estimateur TensorFlow suivant, la signature d'initialisation montre désormais que model_dir ainsi que image_uri Support PipelineVariables, alors que les autres paramètres ne le font pas. Pour plus d'informations, reportez-vous à Estimateur TensorFlow.
    • Avant:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Après:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Séance de pipeline - Session Pipeline est un nouveau concept introduit pour apporter l'unité au sein du SDK SageMaker et introduit une initialisation paresseuse des ressources du pipeline (les appels d'exécution sont capturés mais ne sont pas exécutés tant que le pipeline n'est pas créé et exécuté). Le PipelineSession le contexte hérite de SageMakerSession et met en œuvre des méthodes pratiques pour vous permettre d'interagir avec d'autres entités et ressources SageMaker, telles que des tâches de formation, des points de terminaison et des ensembles de données d'entrée stockés dans Service de stockage simple Amazon (Amazon S3).
  • Compatibilité des sous-classes avec les étapes de travail du pipeline de workflow – Vous pouvez désormais créer des abstractions de tâches et configurer et exécuter des tâches de traitement, de formation, de transformation et de réglage comme vous le feriez sans pipeline.
    • Par exemple, créer une étape de traitement avec SKLearnProcessor auparavant, il fallait ce qui suit :
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Comme nous le voyons dans le code précédent, ProcessingStep doit faire fondamentalement la même logique de prétraitement que .run, juste sans lancer l'appel API pour démarrer le travail. Mais avec la compatibilité des sous-classes désormais activée avec les étapes de travail du pipeline de workflow, nous déclarons le step_args argument qui prend la logique de prétraitement avec .run afin que vous puissiez créer une abstraction de tâche et la configurer comme vous l'utiliseriez sans pipelines. Nous passons également dans le pipeline_session, Qui est un PipelineSession objet, au lieu de sagemaker_session pour vous assurer que les appels d'exécution sont capturés mais pas appelés tant que le pipeline n'est pas créé et exécuté. Voir le code suivant :
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Étape de modèle (une approche rationalisée avec des étapes de création et d'enregistrement de modèle) –Pipelines propose deux types d'étapes à intégrer aux modèles SageMaker : CreateModelStep ainsi que RegisterModel. Vous pouvez désormais réaliser les deux en utilisant uniquement le ModelStep taper. Notez qu'un PipelineSession est nécessaire pour y parvenir. Cela apporte une similitude entre les étapes du pipeline et le SDK.
    • Avant:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Après:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Étape d'échec (arrêt conditionnel de l'exécution du pipeline) - FailStep permet à un pipeline d'être arrêté avec un état d'échec si une condition est remplie, par exemple si le score du modèle est inférieur à un certain seuil.

Vue d'ensemble de la solution

Dans cette solution, votre point d'entrée est le Amazon SageMakerStudio environnement de développement intégré (IDE) pour une expérimentation rapide. Studio offre un environnement pour gérer l'expérience Pipelines de bout en bout. Avec Studio, vous pouvez contourner le Console de gestion AWS pour l’ensemble de la gestion de votre flux de travail. Pour plus d'informations sur la gestion des pipelines depuis Studio, reportez-vous à Afficher, suivre et exécuter des pipelines SageMaker dans SageMaker Studio.

Le diagramme suivant illustre l'architecture de haut niveau du flux de travail ML avec les différentes étapes pour entraîner et générer des inférences à l'aide des nouvelles fonctionnalités.

Le pipeline comprend les étapes suivantes :

  1. Prétraitez les données pour créer les fonctionnalités requises et divisez les données en ensembles de données d'entraînement, de validation et de test.
  2. Créez une tâche de formation avec le framework SageMaker XGBoost.
  3. Évaluez le modèle entraîné à l’aide de l’ensemble de données de test.
  4. Vérifiez si le score AUC est supérieur à un seuil prédéfini.
    • Si le score AUC est inférieur au seuil, arrêtez l’exécution du pipeline et marquez-la comme ayant échoué.
    • Si le score AUC est supérieur au seuil, créez un modèle SageMaker et enregistrez-le dans le registre des modèles SageMaker.
  5. Appliquez la transformation par lots sur l'ensemble de données donné à l'aide du modèle créé à l'étape précédente.

Pré-requis

Pour suivre cet article, vous avez besoin d'un compte AWS avec un Domaine Studio.

Pipelines est intégré directement aux entités et ressources SageMaker, vous n'avez donc pas besoin d'interagir avec d'autres services AWS. Vous n'avez pas non plus besoin de gérer de ressources car il s'agit d'un service entièrement géré, ce qui signifie qu'il crée et gère des ressources pour vous. Pour plus d'informations sur les différents composants SageMaker qui sont à la fois des API Python autonomes et des composants intégrés de Studio, consultez le Page produit SageMaker.

Avant de commencer, installez la version SageMaker SDK >= 2.104.0 et xlrd >=1.0.0 dans le notebook Studio à l'aide de l'extrait de code suivant :

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Flux de travail ML

Pour cet article, vous utilisez les composants suivants :

  • Préparation des données
    • Traitement SageMaker – SageMaker Processing est un service entièrement géré vous permettant d'exécuter des transformations de données personnalisées et une ingénierie de fonctionnalités pour les charges de travail ML.
  • Construction de modèles
  • Formation et évaluation des modèles
    • Formation en un clic – La fonctionnalité de formation distribuée SageMaker. SageMaker fournit des bibliothèques de formation distribuées pour le parallélisme des données et le parallélisme des modèles. Les bibliothèques sont optimisées pour l'environnement de formation SageMaker, vous aident à adapter vos tâches de formation distribuées à SageMaker et à améliorer la vitesse et le débit de la formation.
    • Expériences SageMaker – Les expériences sont une fonctionnalité de SageMaker qui vous permet d'organiser, de suivre, de comparer et d'évaluer vos itérations ML.
    • Transformation par lots SageMaker – La transformation par lots ou la notation hors ligne est un service géré dans SageMaker qui vous permet de prédire sur un ensemble de données plus grand à l'aide de vos modèles ML.
  • Orchestration des flux de travail

Un pipeline SageMaker est une série d'étapes interconnectées définies par une définition de pipeline JSON. Il code un pipeline à l'aide d'un graphe acyclique dirigé (DAG). Le DAG fournit des informations sur les exigences et les relations entre chaque étape du pipeline, et sa structure est déterminée par les dépendances des données entre les étapes. Ces dépendances sont créées lorsque les propriétés de la sortie d'une étape sont transmises comme entrée à une autre étape.

Le diagramme suivant illustre les différentes étapes du pipeline SageMaker (pour un cas d'utilisation de prédiction de désabonnement) où les connexions entre les étapes sont déduites par SageMaker en fonction des entrées et sorties définies par les définitions d'étape.

Les sections suivantes expliquent la création de chaque étape du pipeline et l'exécution de l'intégralité du pipeline une fois créé.

Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Structure du projet

Commençons par la structure du projet :

  • /sm-pipelines-exemple-de-bout-en-bout – Le nom du projet
    • /Les données – Les jeux de données
    • /conduites – Les fichiers de code des composants du pipeline
      • /désabonnement des clients
        • préprocessus.py
        • évaluer.py
    • sagemaker-pipelines-project.ipynb – Un carnet parcourant le workflow de modélisation à l'aide des nouvelles fonctionnalités de Pipelines

Télécharger le jeu de données

Pour suivre cet article, vous devez télécharger et enregistrer le exemple de jeu de données sous le dossier data dans le répertoire personnel du projet, qui enregistre le fichier dans Système de fichiers Amazon Elastic (Amazon EFS) dans l'environnement Studio.

Construire les composants du pipeline

Vous êtes maintenant prêt à créer les composants du pipeline.

Importer des instructions et déclarer des paramètres et des constantes

Créez un bloc-notes Studio appelé sagemaker-pipelines-project.ipynb dans le répertoire personnel du projet. Entrez le bloc de code suivant dans une cellule et exécutez la cellule pour configurer les objets client SageMaker et S3, créez PipelineSessionet configurez l'emplacement du compartiment S3 à l'aide du compartiment par défaut fourni avec une session SageMaker :

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines prend en charge le paramétrage, ce qui vous permet de spécifier les paramètres d'entrée au moment de l'exécution sans modifier le code de votre pipeline. Vous pouvez utiliser les modules disponibles sous le sagemaker.workflow.parameters modules, tels que ParameterInteger, ParameterFloatet ParameterString, pour spécifier les paramètres de pipeline de différents types de données. Exécutez le code suivant pour configurer plusieurs paramètres d'entrée :

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Générer un ensemble de données par lots

Générez l'ensemble de données par lots, que vous utiliserez plus tard dans l'étape de transformation par lots :

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Importer des données dans un compartiment S3

Téléchargez les ensembles de données sur Amazon S3 :

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Définir un script de traitement et une étape de traitement

Au cours de cette étape, vous préparez un script Python pour effectuer l'ingénierie des fonctionnalités, un encodage à chaud et organisez les fractionnements de formation, de validation et de test à utiliser pour la création de modèles. Exécutez le code suivant pour créer votre script de traitement :

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Ensuite, exécutez le bloc de code suivant pour instancier le processeur et l'étape Pipelines pour exécuter le script de traitement. Le script de traitement étant écrit en Pandas, vous utilisez un SKLearnProcesseur. Les pipelines ProcessingStep La fonction prend les arguments suivants : le processeur, les emplacements S3 d'entrée pour les ensembles de données brutes et les emplacements S3 de sortie pour enregistrer les ensembles de données traités.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Définir une étape de formation

Configurer la formation du modèle à l'aide d'un estimateur SageMaker XGBoost et des pipelines TrainingStep fonction:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Définir le script d'évaluation et l'étape d'évaluation du modèle

Exécutez le bloc de code suivant pour évaluer le modèle une fois entraîné. Ce script encapsule la logique pour vérifier si le score AUC atteint le seuil spécifié.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Ensuite, exécutez le bloc de code suivant pour instancier le processeur et l'étape Pipelines pour exécuter le script d'évaluation. Étant donné que le script d'évaluation utilise le package XGBoost, vous utilisez un ScriptProcessor avec l'image XGBoost. Les pipelines ProcessingStep La fonction prend les arguments suivants : le processeur, les emplacements S3 d'entrée pour les ensembles de données brutes et les emplacements S3 de sortie pour enregistrer les ensembles de données traités.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Définir une étape de création de modèle

Exécutez le bloc de code suivant pour créer un modèle SageMaker à l'aide de l'étape de modèle Pipelines. Cette étape utilise le résultat de l’étape de formation pour empaqueter le modèle en vue du déploiement. Notez que la valeur de l'argument de type d'instance est transmise à l'aide du paramètre Pipelines que vous avez défini précédemment dans l'article.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Définir une étape de transformation par lots

Exécutez le bloc de code suivant pour exécuter la transformation par lots à l'aide du modèle entraîné avec l'entrée par lots créée lors de la première étape :

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Définir une étape de modèle de registre

Le code suivant enregistre le modèle dans le registre de modèles SageMaker à l'aide de l'étape de modèle Pipelines :

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Définir une étape d'échec pour arrêter le pipeline

Le code suivant définit l'étape d'échec des pipelines pour arrêter l'exécution du pipeline avec un message d'erreur si le score AUC n'atteint pas le seuil défini :

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Définir une étape de condition pour vérifier le score AUC

Le code suivant définit une étape de condition pour vérifier le score AUC et créer conditionnellement un modèle, exécuter une transformation par lots et enregistrer un modèle dans le registre des modèles, ou arrêter l'exécution du pipeline en cas d'échec :

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Créer et exécuter le pipeline

Après avoir défini toutes les étapes du composant, vous pouvez les assembler dans un objet Pipelines. Vous n'avez pas besoin de spécifier l'ordre du pipeline, car Pipelines déduit automatiquement la séquence d'ordre en fonction des dépendances entre les étapes.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Exécutez le code suivant dans une cellule de votre bloc-notes. Si le pipeline existe déjà, le code met à jour le pipeline. Si le pipeline n’existe pas, il en crée un nouveau.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Conclusion

Dans cet article, nous avons présenté certaines des nouvelles fonctionnalités désormais disponibles avec Pipelines ainsi que d'autres fonctionnalités intégrées de SageMaker et l'algorithme XGBoost pour développer, itérer et déployer un modèle de prédiction du taux de désabonnement. La solution peut être étendue avec des sources de données supplémentaires

pour mettre en œuvre votre propre flux de travail ML. Pour plus de détails sur les étapes disponibles dans le workflow Pipelines, reportez-vous à Pipeline de création de modèles Amazon SageMaker ainsi que Flux de travail SageMakerL’ Exemples AWS SageMaker Le dépôt GitHub contient plus d'exemples autour de divers cas d'utilisation utilisant des pipelines.


À propos des auteurs

Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Jerry Peng est ingénieur en développement logiciel chez AWS SageMaker. Il se concentre sur la création d'un système MLOps de bout en bout à grande échelle, de la formation à la surveillance des modèles en production. Il est également passionné par l’idée de faire connaître le concept de MLOps à un public plus large.

Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Dewen Qi est ingénieur en développement logiciel chez AWS. Elle se concentre actuellement sur le développement et l'amélioration des pipelines SageMaker. En dehors du travail, elle aime pratiquer le violoncelle.

Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Gayatri Ghanakota est un ingénieur principal en apprentissage machine chez AWS Professional Services. Elle est passionnée par le développement, le déploiement et l'explication de solutions d'IA/ML dans divers domaines. Avant d'occuper ce poste, elle a dirigé plusieurs initiatives en tant que data scientist et ingénieur ML avec les plus grandes entreprises mondiales dans le domaine de la finance et de la vente au détail. Elle est titulaire d'une maîtrise en informatique spécialisée en science des données de l'Université du Colorado à Boulder.

Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Rupinder Grewal est un architecte de solutions spécialisé Sr Ai/ML avec AWS. Il se concentre actuellement sur le service des modèles et des MLOps sur SageMaker. Avant d'occuper ce poste, il a travaillé en tant qu'ingénieur en apprentissage automatique pour créer et héberger des modèles. En dehors du travail, il aime jouer au tennis et faire du vélo sur les sentiers de montagne.

Nouvelles fonctionnalités pour Amazon SageMaker Pipelines et le SDK Amazon SageMaker PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Ray Li est un Data Scientist senior chez AWS Professional Services. Sa spécialité se concentre sur la création et la mise en œuvre de solutions IA/ML pour des clients de différentes tailles, allant des startups aux entreprises. En dehors du travail, Ray aime le fitness et les voyages.

Horodatage:

Plus de Apprentissage automatique AWS