Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.

Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK

Amazon SageMaker-pijpleidingen stelt datawetenschappers en machine learning-engineers (ML) in staat om trainingsworkflows te automatiseren, wat u helpt een herhaalbaar proces te creëren om modelontwikkelingsstappen te orkestreren voor snelle experimenten en hertraining van modellen. U kunt de volledige workflow voor het bouwen van modellen automatiseren, inclusief gegevensvoorbereiding, feature-engineering, modeltraining, modelafstemming en modelvalidatie, en deze catalogiseren in het modelregister. U kunt pijplijnen zo configureren dat ze automatisch worden uitgevoerd met regelmatige tussenpozen of wanneer bepaalde gebeurtenissen worden geactiveerd, of u kunt ze indien nodig handmatig uitvoeren.

In dit bericht belichten we enkele van de verbeteringen aan de Amazon Sage Maker SDK en introduceren nieuwe functies van Amazon SageMaker Pipelines die het voor ML-beoefenaars gemakkelijker maken om ML-modellen te bouwen en te trainen.

Pipelines blijft zijn ontwikkelaarservaring innoveren en met deze recente releases kunt u de service nu op een meer aangepaste manier gebruiken:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Bijgewerkte documentatie over PipelineVariable gebruik voor schatter, processor, tuner, transformator en modelbasisklassen, Amazon-modellen en raamwerkmodellen. Er zullen aanvullende wijzigingen komen met nieuwere versies van de SDK om alle subklassen van schatters en processors te ondersteunen.
  • 2.90.0 - Beschikbaarheid van ModelStap voor geïntegreerde taken voor het maken en registreren van modelbronnen.
  • 2.88.2 - Beschikbaarheid van Pijplijnsessie voor beheerde interactie met SageMaker-entiteiten en -bronnen.
  • 2.88.2 – Compatibiliteit met subklassen voor workflow pijplijn taakstappen zodat u taakabstracties kunt bouwen en verwerkings-, training-, transformatie- en afstemmingstaken kunt configureren en uitvoeren zoals u zou doen zonder een pijplijn.
  • 2.76.0 - Beschikbaarheid van FailStap om een ​​pijplijn met een storingsstatus voorwaardelijk te stoppen.

In dit bericht leiden we u door een workflow met behulp van een voorbeeldgegevensset met een focus op modelbouw en -implementatie om te demonstreren hoe u de nieuwe functies van Pipelines kunt implementeren. Aan het einde zou u voldoende informatie moeten hebben om deze nieuwere functies met succes te gebruiken en uw ML-workloads te vereenvoudigen.

Functies overzicht

Pipelines biedt de volgende nieuwe functies:

  • Annotatie van pijplijnvariabelen – Bepaalde methodeparameters accepteren meerdere invoertypen, waaronder: PipelineVariables, en er is aanvullende documentatie toegevoegd om te verduidelijken waar PipelineVariables worden ondersteund in zowel de nieuwste stabiele versie van SageMaker SDK-documentatie als de init-handtekening van de functies. In de volgende TensorFlow-schatter toont de init-handtekening nu bijvoorbeeld dat: model_dir en image_uri ondersteuning PipelineVariables, terwijl de andere parameters dat niet doen. Voor meer informatie, zie: TensorFlow-schatter.
    • Vooraf:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Na:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Pijplijnsessie - Pijplijnsessie is een nieuw concept dat is geïntroduceerd om eenheid te brengen in de SageMaker SDK en introduceert een luie initialisatie van de pijplijnbronnen (de run-aanroepen worden vastgelegd maar niet uitgevoerd totdat de pijplijn is gemaakt en uitgevoerd). De PipelineSession context erft de SageMakerSession en implementeert handige methoden voor interactie met andere SageMaker-entiteiten en -bronnen, zoals trainingstaken, eindpunten en invoergegevenssets die zijn opgeslagen in Amazon eenvoudige opslagservice (Amazone S3).
  • Compatibiliteit van subklasse met taakstappen voor workflow-pijplijn – U kunt nu taakabstracties maken en verwerkings-, training-, transformatie- en afstemmingstaken configureren en uitvoeren zoals u zou doen zonder een pijplijn.
    • Bijvoorbeeld een verwerkingsstap maken met SKLearnProcessor eerder het volgende nodig:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Zoals we in de voorgaande code zien, ProcessingStep moet in principe dezelfde voorverwerkingslogica doen als: .run, gewoon zonder de API-aanroep te starten om de taak te starten. Maar nu de compatibiliteit van subklassen is ingeschakeld met de taakstappen van de workflow-pijplijn, declareren we de step_args argument dat de preprocessing-logica met .run gebruikt, zodat u een taakabstractie kunt bouwen en deze kunt configureren zoals u deze zonder Pipelines zou gebruiken. We passeren ook in de pipeline_session, Een PipelineSession object, in plaats van sagemaker_session om ervoor te zorgen dat de run-aanroepen worden vastgelegd maar niet worden aangeroepen totdat de pijplijn is gemaakt en uitgevoerd. Zie de volgende code:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Modelstap (een gestroomlijnde aanpak met stappen voor het maken en registreren van modellen) –Pipelines biedt twee soorten stappen om te integreren met SageMaker-modellen: CreateModelStep en RegisterModel. U kunt nu beide bereiken met alleen de ModelStep type. Merk op dat een PipelineSession is nodig om dit te bereiken. Dit brengt overeenkomst tussen de pijplijnstappen en de SDK.
    • Vooraf:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Na:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fail-stap (voorwaardelijke stop van de pijplijnrun) - FailStep maakt het mogelijk een pijplijn te stoppen met een storingsstatus als aan een voorwaarde wordt voldaan, bijvoorbeeld als de modelscore onder een bepaalde drempel ligt.

Overzicht oplossingen

In deze oplossing is uw toegangspunt de Amazon SageMaker Studio geïntegreerde ontwikkelomgeving (IDE) voor snel experimenteren. Studio biedt een omgeving om de end-to-end Pipelines-ervaring te beheren. Met Studio kunt u de AWS-beheerconsole voor uw volledige workflowbeheer. Voor meer informatie over het beheren van pijplijnen vanuit Studio, zie: SageMaker-pijplijnen bekijken, volgen en uitvoeren in SageMaker Studio.

Het volgende diagram illustreert de architectuur op hoog niveau van de ML-workflow met de verschillende stappen voor het trainen en genereren van gevolgtrekkingen met behulp van de nieuwe functies.

De pijplijn omvat de volgende stappen:

  1. Verwerk gegevens voor om de vereiste functies te bouwen en splits gegevens op in trein-, validatie- en testgegevenssets.
  2. Creëer een trainingstaak met het SageMaker XGBoost-framework.
  3. Evalueer het getrainde model met behulp van de testgegevensset.
  4. Controleer of de AUC-score boven een vooraf gedefinieerde drempel ligt.
    • Als de AUC-score lager is dan de drempelwaarde, stopt u de pijplijnuitvoering en markeert u deze als mislukt.
    • Als de AUC-score hoger is dan de drempel, maakt u een SageMaker-model en registreert u dit in het SageMaker-modelregister.
  5. Pas batchtransformatie toe op de gegeven gegevensset met behulp van het model dat in de vorige stap is gemaakt.

Voorwaarden

Om dit bericht te volgen, heb je een AWS-account nodig met een Studio domein.

Pipelines is rechtstreeks geïntegreerd met SageMaker-entiteiten en -bronnen, dus u hoeft geen interactie te hebben met andere AWS-services. U hoeft ook geen resources te beheren omdat het een volledig beheerde service is, wat betekent dat het resources voor u maakt en beheert. Voor meer informatie over de verschillende SageMaker-componenten die zowel zelfstandige Python-API's als geïntegreerde componenten van Studio zijn, zie de SageMaker-productpagina.

Installeer voordat u aan de slag gaat SageMaker SDK-versie >= 2.104.0 en xlrd >=1.0.0 in de Studio-notebook met behulp van het volgende codefragment:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML-workflow

Voor deze post gebruik je de volgende componenten:

  • Data voorbereiding
    • SageMaker-verwerking – SageMaker Processing is een volledig beheerde service waarmee u aangepaste gegevenstransformaties en feature-engineering voor ML-workloads kunt uitvoeren.
  • Model gebouw
  • Modeltraining en evaluatie
    • Training met één klik – De SageMaker gedistribueerde trainingsfunctie. SageMaker biedt gedistribueerde trainingsbibliotheken voor gegevensparallellisme en modelparallellisme. De bibliotheken zijn geoptimaliseerd voor de SageMaker-trainingsomgeving, helpen uw gedistribueerde trainingstaken aan te passen aan SageMaker en verbeteren de trainingssnelheid en -doorvoer.
    • SageMaker-experimenten – Experimenten is een mogelijkheid van SageMaker waarmee u uw ML-iteraties kunt organiseren, volgen, vergelijken en evalueren.
    • SageMaker batch-transformatie – Batchtransformatie of offline scoren is een beheerde service in SageMaker waarmee u kunt voorspellen op een grotere dataset met behulp van uw ML-modellen.
  • Workflow-orkestratie

Een SageMaker-pijplijn is een reeks onderling verbonden stappen die worden gedefinieerd door een JSON-pijplijndefinitie. Het codeert een pijplijn met behulp van een gerichte acyclische grafiek (DAG). De DAG geeft informatie over de vereisten voor en relaties tussen elke stap van de pijplijn, en de structuur ervan wordt bepaald door de gegevensafhankelijkheden tussen de stappen. Deze afhankelijkheden worden gemaakt wanneer de eigenschappen van de uitvoer van een stap worden doorgegeven als invoer voor een andere stap.

Het volgende diagram illustreert de verschillende stappen in de SageMaker-pijplijn (voor een gebruiksscenario voor het voorspellen van verloop) waarbij de verbindingen tussen de stappen worden afgeleid door SageMaker op basis van de invoer en uitvoer die zijn gedefinieerd door de stapdefinities.

In de volgende secties wordt beschreven hoe u elke stap van de pijplijn maakt en de volledige pijplijn uitvoert nadat deze is gemaakt.

Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.

Project structuur

Laten we beginnen met de projectstructuur:

  • /sm-pipelines-end-to-end-voorbeeld – De projectnaam
    • /gegevens – De datasets
    • /pijpleidingen – De codebestanden voor pijplijncomponenten
      • /klantenverloop
        • preproces.py
        • evalueren.py
    • sagemaker-pijpleidingen-project.ipynb – Een notebook die door de modelleringsworkflow loopt met behulp van de nieuwe functies van Pipelines

Download de dataset

Om dit bericht te volgen, moet je de . downloaden en opslaan voorbeeldgegevensset onder de gegevensmap in de hoofdmap van het project, waarin het bestand wordt opgeslagen in Amazon elastisch bestandssysteem (Amazon EFS) binnen de Studio-omgeving.

Bouw de pijplijncomponenten

Nu bent u klaar om de pijplijncomponenten te bouwen.

Verklaringen importeren en parameters en constanten declareren

Maak een Studio-notebook met de naam sagemaker-pipelines-project.ipynb in de homedirectory van het project. Voer het volgende codeblok in een cel in en voer de cel uit om SageMaker- en S3-clientobjecten in te stellen, maak PipelineSession, en stel de S3-bucketlocatie in met behulp van de standaardbucket die bij een SageMaker-sessie wordt geleverd:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Pipelines ondersteunt parametrering, waarmee u tijdens runtime invoerparameters kunt opgeven zonder uw pipeline-code te wijzigen. U kunt de modules gebruiken die beschikbaar zijn onder de sagemaker.workflow.parameters module, zoals: ParameterInteger, ParameterFloat en ParameterString, om pijplijnparameters van verschillende gegevenstypen op te geven. Voer de volgende code uit om meerdere invoerparameters in te stellen:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Een batchgegevensset genereren

Genereer de batchgegevensset, die u later in de batchtransformatiestap gebruikt:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Gegevens uploaden naar een S3-bucket

Upload de datasets naar Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Definieer een verwerkingsscript en verwerkingsstap

In deze stap bereidt u een Python-script voor om feature-engineering uit te voeren, één hot-codering, en beheert u de training, validatie en testsplitsingen die moeten worden gebruikt voor het bouwen van modellen. Voer de volgende code uit om uw verwerkingsscript te bouwen:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Voer vervolgens het volgende codeblok uit om de processor te instantiëren en de stap Pipelines om het verwerkingsscript uit te voeren. Omdat het verwerkingsscript in Panda's is geschreven, gebruik je a SKLearnProcessor. De pijpleidingen ProcessingStep functie heeft de volgende argumenten: de processor, de invoer S3-locaties voor onbewerkte datasets en de uitvoer S3-locaties om verwerkte datasets op te slaan.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Definieer een trainingsstap

Stel modeltraining in met behulp van een SageMaker XGBoost-schatter en de Pipelines TrainingStep functie:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Definieer het evaluatiescript en de modelevaluatiestap

Voer het volgende codeblok uit om het model te evalueren nadat het is getraind. Dit script bevat de logica om te controleren of de AUC-score aan de opgegeven drempelwaarde voldoet.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Voer vervolgens het volgende codeblok uit om de processor te instantiëren en de stap Pipelines om het evaluatiescript uit te voeren. Omdat het evaluatiescript het XGBoost-pakket gebruikt, gebruik je a ScriptProcessor samen met de XGBoost-afbeelding. De pijpleidingen ProcessingStep functie heeft de volgende argumenten: de processor, de invoer S3-locaties voor onbewerkte datasets en de uitvoer S3-locaties om verwerkte datasets op te slaan.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Definieer een stap voor het maken van een model

Voer het volgende codeblok uit om een ​​SageMaker-model te maken met behulp van de modelstap Pijplijnen. Deze stap gebruikt de uitvoer van de trainingsstap om het model te verpakken voor implementatie. Houd er rekening mee dat de waarde voor het argument van het instantietype wordt doorgegeven met behulp van de parameter Pipelines die u eerder in de post hebt gedefinieerd.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Een batchtransformatiestap definiëren

Voer het volgende codeblok uit om batchtransformatie uit te voeren met behulp van het getrainde model met de batchinvoer die in de eerste stap is gemaakt:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Definieer een registermodel stap

De volgende code registreert het model in het SageMaker-modelregister met behulp van de modelstap Pijplijnen:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Definieer een mislukte stap om de pijplijn te stoppen

De volgende code definieert de stap Pijplijnen mislukken om de pijplijnuitvoering te stoppen met een foutbericht als de AUC-score niet aan de gedefinieerde drempelwaarde voldoet:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Definieer een voorwaardestap om de AUC-score te controleren

De volgende code definieert een voorwaardestap om de AUC-score te controleren en voorwaardelijk een model te maken en een batchtransformatie uit te voeren en een model te registreren in het modelregister, of om de pijplijnuitvoering in een mislukte staat te stoppen:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

De pijplijn bouwen en uitvoeren

Nadat u alle componentstappen hebt gedefinieerd, kunt u ze samenvoegen tot een Pipelines-object. U hoeft de volgorde van de pijplijn niet op te geven, omdat Pipelines automatisch de volgorde van de volgorde afleidt op basis van de afhankelijkheden tussen de stappen.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Voer de volgende code uit in een cel in uw notitieblok. Als de pijplijn al bestaat, werkt de code de pijplijn bij. Als de pijplijn niet bestaat, wordt er een nieuwe gemaakt.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Conclusie

In dit bericht hebben we enkele van de nieuwe functies geïntroduceerd die nu beschikbaar zijn met Pipelines, samen met andere ingebouwde SageMaker-functies en het XGBoost-algoritme om een ​​model voor churn-voorspelling te ontwikkelen, herhalen en implementeren. De oplossing kan worden uitgebreid met extra gegevensbronnen

om uw eigen ML-workflow te implementeren. Voor meer details over de stappen die beschikbaar zijn in de Pipelines-workflow, zie: Amazon SageMaker Modelbouwpijpleiding en SageMaker-workflows. De AWS SageMaker-voorbeelden GitHub-repo heeft meer voorbeelden van verschillende use-cases met behulp van Pipelines.


Over de auteurs

Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Jerry Peng is een softwareontwikkelaar bij AWS SageMaker. Hij richt zich op het bouwen van end-to-end grootschalige MLOps-systemen, van training tot modelmonitoring in productie. Hij is ook gepassioneerd om het concept van MLOps naar een breder publiek te brengen.

Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Dewen Qi is een Software Development Engineer bij AWS. Momenteel richt ze zich op het ontwikkelen en verbeteren van SageMaker Pipelines. Naast haar werk beoefent ze graag cello.

Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Gayatri Ghanakota is een Sr. Machine Learning Engineer bij AWS Professional Services. Ze heeft een passie voor het ontwikkelen, implementeren en uitleggen van AI/ML-oplossingen in verschillende domeinen. Voorafgaand aan deze functie leidde ze meerdere initiatieven als datawetenschapper en ML-engineer bij wereldwijde topbedrijven in de financiële en winkelruimte. Ze heeft een master's degree in Computer Science, gespecialiseerd in Data Science, van de University of Colorado, Boulder.

Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Rupinder Grewal is een Sr Ai/ML Specialist Solutions Architect bij AWS. Hij richt zich momenteel op het bedienen van modellen en MLOps op SageMaker. Voorafgaand aan deze functie heeft hij gewerkt als Machine Learning Engineer voor het bouwen en hosten van modellen. Naast zijn werk speelt hij graag tennis en fietst hij graag op bergpaden.

Nieuwe functies voor Amazon SageMaker Pipelines en de Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Ray Li is Sr. Data Scientist bij AWS Professional Services. Zijn specialiteit is het bouwen en operationaliseren van AI/ML-oplossingen voor klanten van verschillende groottes, variërend van startups tot enterprise organisaties. Naast zijn werk houdt Ray van fitness en reizen.

Tijdstempel:

Meer van AWS-machine learning