Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Navpično iskanje. Ai.

Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK

Amazonski cevovodi SageMaker omogoča podatkovnim znanstvenikom in inženirjem strojnega učenja (ML), da avtomatizirajo delovne tokove usposabljanja, kar vam pomaga ustvariti ponovljiv proces za usmerjanje razvojnih korakov modela za hitro eksperimentiranje in ponovno usposabljanje modela. Avtomatizirate lahko celoten delovni tok gradnje modela, vključno s pripravo podatkov, inženiringom funkcij, usposabljanjem modela, prilagajanjem modela in preverjanjem veljavnosti modela, ter ga katalogizirate v registru modelov. Cevovode lahko konfigurirate tako, da se izvajajo samodejno v rednih intervalih ali ko se sprožijo določeni dogodki, ali pa jih po potrebi zaženete ročno.

V tej objavi izpostavljamo nekatere izboljšave za Amazon SageMaker SDK in predstavite nove funkcije Amazon SageMaker Pipelines, ki strokovnjakom ML olajšajo izdelavo in usposabljanje modelov ML.

Pipelines še naprej izpopolnjuje svojo izkušnjo za razvijalce in s temi nedavnimi izdajami lahko zdaj storitev uporabljate na bolj prilagojen način:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – posodobljena dokumentacija o PipelineVariable uporaba za osnovne razrede ocenjevalcev, procesorjev, sprejemnikov, transformatorjev in modelov, Amazonove modele in okvirne modele. Prišlo bo do dodatnih sprememb z novejšimi različicami SDK za podporo vsem podrazredom ocenjevalcev in procesorjev.
  • 2.90.0 – Razpoložljivost ModelStep za naloge ustvarjanja in registracije virov integriranega modela.
  • 2.88.2 – Razpoložljivost PipelineSession za upravljano interakcijo z entitetami in viri SageMaker.
  • 2.88.2 – Združljivost podrazredov za koraki dela cevovoda delovnega toka tako da lahko gradite abstrakcije opravil ter konfigurirate in izvajate opravila obdelave, usposabljanja, preoblikovanja in prilagajanja, kot bi to storili brez cevovoda.
  • 2.76.0 – Razpoložljivost FailStep za pogojno zaustavitev cevovoda s statusom okvare.

V tej objavi vas vodimo skozi potek dela z uporabo vzorčnega nabora podatkov s poudarkom na gradnji in uvajanju modela, da pokažemo, kako implementirati nove funkcije Pipelines. Na koncu bi morali imeti dovolj informacij za uspešno uporabo teh novejših funkcij in poenostavitev delovnih obremenitev ML.

Pregled funkcij

Cevovodi ponujajo naslednje nove funkcije:

  • Opomba spremenljivke cevovoda – Nekateri parametri metode sprejemajo več vrst vnosa, vključno z PipelineVariables, dodana pa je bila dodatna dokumentacija, ki pojasnjuje, kje PipelineVariables so podprte tako v najnovejši stabilni različici dokumentacije SDK SageMaker kot v zagonskem podpisu funkcij. Na primer, v naslednjem ocenjevalcu TensorFlow začetni podpis to zdaj pokaže model_dir in image_uri podpora PipelineVariables, medtem ko drugi parametri ne. Za več informacij glejte Ocenjevalnik TensorFlow.
    • Pred:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Po:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Pipeline session - PipelineSession je nov koncept, uveden za enotnost SDK-ja SageMaker in uvaja leno inicializacijo virov cevovoda (klici izvajanja so zajeti, vendar se ne izvajajo, dokler cevovod ni ustvarjen in zagnan). The PipelineSession kontekst podeduje SageMakerSession in izvaja priročne metode za vašo interakcijo z drugimi entitetami in viri SageMaker, kot so delovna mesta za usposabljanje, končne točke in nabori vhodnih podatkov, shranjeni v Preprosta storitev shranjevanja Amazon (Amazon S3).
  • Združljivost podrazreda s koraki opravila cevovoda delovnega toka – Zdaj lahko gradite abstrakcije opravil ter konfigurirate in izvajate opravila obdelave, usposabljanja, preoblikovanja in prilagajanja, kot bi to storili brez cevovoda.
    • Na primer, ustvarjanje koraka obdelave z SKLearnProcessor prej zahteval naslednje:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Kot vidimo v prejšnji kodi, ProcessingStep mora izvesti v bistvu enako logiko predprocesiranja kot .run, samo brez sprožitve klica API za začetek opravila. Toda z združljivostjo podrazredov, ki je zdaj omogočena s koraki opravil cevovoda delovnega toka, razglasimo step_args argument, ki prevzame logiko predprocesiranja z .run, tako da lahko zgradite abstrakcijo opravila in jo konfigurirate, kot bi jo uporabljali brez cevovodov. Prehajamo tudi v pipeline_session, Ki je PipelineSession objekt, namesto sagemaker_session da zagotovite, da so zagonski klici zajeti, vendar ne klicani, dokler cevovod ni ustvarjen in zagnan. Oglejte si naslednjo kodo:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Korak modela (poenostavljen pristop s koraki ustvarjanja modela in registracije) –Pipelines ponuja dve vrsti korakov za integracijo z modeli SageMaker: CreateModelStep in RegisterModel. Zdaj lahko oboje dosežete le z uporabo ModelStep vrsto. Upoštevajte, da a PipelineSession je potrebno za dosego tega. To prinaša podobnost med koraki cevovoda in SDK.
    • Pred:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Po:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Fail step (pogojna ustavitev teka cevovoda) - FailStep omogoča zaustavitev cevovoda s statusom napake, če je izpolnjen pogoj, na primer, če je rezultat modela pod določenim pragom.

Pregled rešitev

V tej rešitvi je vaša vstopna točka Amazon SageMaker Studio integrirano razvojno okolje (IDE) za hitro eksperimentiranje. Studio ponuja okolje za upravljanje izkušnje Pipelines od konca do konca. S programom Studio lahko obidete Konzola za upravljanje AWS za celotno upravljanje poteka dela. Za več informacij o upravljanju cevovodov znotraj Studia glejte Oglejte si, sledite in izvajajte cevovode SageMaker v SageMaker Studio.

Naslednji diagram ponazarja visokonivojsko arhitekturo delovnega toka ML z različnimi koraki za usposabljanje in ustvarjanje sklepanja z uporabo novih funkcij.

Cevovod vključuje naslednje korake:

  1. Vnaprej obdelajte podatke za izgradnjo zahtevanih funkcij in razdelite podatke na nabore podatkov za usposabljanje, validacijo in testiranje.
  2. Ustvarite delovno mesto za usposabljanje z ogrodjem SageMaker XGBoost.
  3. Ocenite usposobljeni model z uporabo preskusnega nabora podatkov.
  4. Preverite, ali je rezultat AUC nad vnaprej določenim pragom.
    • Če je rezultat AUC nižji od praga, ustavite zagon cevovoda in ga označite kot neuspešnega.
    • Če je rezultat AUC višji od praga, ustvarite model SageMaker in ga registrirajte v registru modelov SageMaker.
  5. Uporabite paketno transformacijo na danem naboru podatkov z uporabo modela, ustvarjenega v prejšnjem koraku.

Predpogoji

Če želite slediti tej objavi, potrebujete račun AWS z a Studio domena.

Cevovodi so integrirani neposredno z entitetami in viri SageMaker, tako da vam ni treba komunicirati z drugimi storitvami AWS. Prav tako vam ni treba upravljati virov, ker gre za popolnoma upravljano storitev, kar pomeni, da ustvarja in upravlja vire namesto vas. Za več informacij o različnih komponentah SageMaker, ki so samostojni API-ji Python skupaj z integriranimi komponentami Studia, glejte Stran izdelka SageMaker.

Preden začnete, namestite različico SDK SageMaker >= 2.104.0 in xlrd >=1.0.0 v prenosnem računalniku Studio z naslednjim delčkom kode:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Potek dela ML

Za to objavo uporabite naslednje komponente:

  • Priprava podatkov
    • Obdelava žajblja – SageMaker Processing je popolnoma upravljana storitev, ki vam omogoča izvajanje transformacij podatkov po meri in inženiring funkcij za delovne obremenitve ML.
  • Izdelava makete
  • Modelno usposabljanje in vrednotenje
    • Usposabljanje z enim klikom – Funkcija porazdeljenega usposabljanja SageMaker. SageMaker zagotavlja porazdeljene učne knjižnice za paralelizem podatkov in modelov. Knjižnice so optimizirane za vadbeno okolje SageMaker, pomagajo prilagoditi vaša porazdeljena vadbena opravila SageMakerju ter izboljšajo hitrost in prepustnost vadbe.
    • Poskusi SageMaker – Eksperimenti so zmožnost SageMakerja, ki vam omogoča organiziranje, sledenje, primerjavo in ocenjevanje vaših iteracij ML.
    • Serijska preobrazba SageMaker – Paketno preoblikovanje ali točkovanje brez povezave je upravljana storitev v SageMakerju, ki vam omogoča napovedovanje na večjem naboru podatkov z uporabo vaših modelov ML.
  • Orkestracija poteka dela

Cevovod SageMaker je niz medsebojno povezanih korakov, ki jih definira definicija cevovoda JSON. Kodira cevovod z uporabo usmerjenega acikličnega grafa (DAG). DAG daje informacije o zahtevah in odnosih med posameznimi koraki cevovoda, njegovo strukturo pa določajo odvisnosti podatkov med koraki. Te odvisnosti se ustvarijo, ko se lastnosti izhoda koraka posredujejo kot vhod v drug korak.

Naslednji diagram ponazarja različne korake v cevovodu SageMaker (za primer uporabe predvidevanja odliva), kjer SageMaker sklepa o povezavah med koraki na podlagi vhodov in izhodov, opredeljenih z definicijami korakov.

Naslednji razdelki prikazujejo ustvarjanje vsakega koraka cevovoda in izvajanje celotnega cevovoda, ko je ustvarjen.

Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Navpično iskanje. Ai.

Struktura projekta

Začnimo s strukturo projekta:

  • /sm-pipelines-end-to-end-example – Ime projekta
    • / podatki – Nabori podatkov
    • /cevovodi – Datoteke kode za komponente cevovoda
      • /customerchurn
        • preprocess.py
        • oceni.py
    • sagemaker-pipelines-project.ipynb – Beležnica, ki se sprehaja skozi potek dela modeliranja z uporabo novih funkcij Pipelines

Prenesite nabor podatkov

Če želite slediti tej objavi, morate prenesti in shraniti vzorec nabora podatkov pod podatkovno mapo v domačem imeniku projekta, v kateri se datoteka shrani Elastični datotečni sistem Amazon (Amazon EFS) v okolju Studio.

Zgradite komponente cevovoda

Zdaj ste pripravljeni na izdelavo komponent cevovoda.

Uvozi stavke ter deklariraj parametre in konstante

Ustvarite zvezek Studio, imenovan sagemaker-pipelines-project.ipynb znotraj domačega imenika projekta. Vnesite naslednji blok kode v celico in zaženite celico, da nastavite odjemalske predmete SageMaker in S3, ustvarite PipelineSessionin nastavite lokacijo vedra S3 s privzetim vedrom, ki je priloženo seji SageMaker:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Cevovodi podpirajo parametrizacijo, ki vam omogoča, da določite vhodne parametre med izvajanjem, ne da bi spremenili kodo cevovoda. Uporabite lahko module, ki so na voljo pod sagemaker.workflow.parameters modul, kot npr ParameterInteger, ParameterFloatin ParameterString, da podate parametre cevovoda različnih tipov podatkov. Zaženite naslednjo kodo, da nastavite več vhodnih parametrov:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Ustvari paketni nabor podatkov

Ustvarite paketni nabor podatkov, ki ga boste pozneje uporabili v koraku paketnega preoblikovanja:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Naložite podatke v vedro S3

Naložite nabore podatkov v Amazon S3:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Določite skript obdelave in korak obdelave

V tem koraku pripravite skript Python za inženiring funkcij, eno vroče kodiranje in pripravite razdelitve usposabljanja, validacije in testiranja, ki se bodo uporabljali za gradnjo modela. Za izgradnjo skripta za obdelavo zaženite naslednjo kodo:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Nato zaženite naslednji blok kode, da ustvarite primerek procesorja, in korak Cevovodi, da zaženete skript za obdelavo. Ker je skript za obdelavo napisan v Pandas, uporabite a SKLearnProcessor. Cevovodi ProcessingStep funkcija sprejme naslednje argumente: procesor, vhodne lokacije S3 za neobdelane nabore podatkov in izhodne lokacije S3 za shranjevanje obdelanih naborov podatkov.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Določite korak usposabljanja

Nastavite usposabljanje modela z uporabo ocenjevalca SageMaker XGBoost in cevovodov TrainingStep funkcija:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Določite skript ocenjevanja in korak ocenjevanja modela

Zaženite naslednji kodni blok, da ocenite model, ko je usposobljen. Ta skript vsebuje logiko za preverjanje, ali rezultat AUC ustreza podanemu pragu.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Nato zaženite naslednji blok kode, da instancirate procesor in korak Cevovodi, da zaženete ocenjevalni skript. Ker ocenjevalni skript uporablja paket XGBoost, uporabite a ScriptProcessor skupaj s sliko XGBoost. Cevovodi ProcessingStep funkcija sprejme naslednje argumente: procesor, vhodne lokacije S3 za neobdelane nabore podatkov in izhodne lokacije S3 za shranjevanje obdelanih naborov podatkov.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Določite korak ustvarjanja modela

Zaženite naslednji blok kode, da ustvarite model SageMaker z uporabo koraka modela Cevovodi. Ta korak uporablja izhod koraka usposabljanja za pakiranje modela za uvajanje. Upoštevajte, da je vrednost za argument vrste primerka posredovana s parametrom Cevovodi, ki ste ga definirali prej v objavi.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Določite korak paketne transformacije

Zaženite naslednji blok kode, da zaženete paketno transformacijo z uporabo usposobljenega modela s paketnim vnosom, ustvarjenim v prvem koraku:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Določite korak modela registra

Naslednja koda registrira model v registru modelov SageMaker z uporabo koraka modela Cevovodi:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Določite neuspeli korak za zaustavitev cevovoda

Naslednja koda definira korak Pipelines fail za zaustavitev delovanja cevovoda s sporočilom o napaki, če rezultat AUC ne doseže definiranega praga:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Določite korak pogoja za preverjanje ocene AUC

Naslednja koda definira korak pogoja za preverjanje ocene AUC in pogojno ustvarjanje modela ter zagon paketne transformacije in registracijo modela v registru modelov ali zaustavitev izvajanja cevovoda v neuspelem stanju:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Zgradite in zaženite cevovod

Ko definirate vse korake komponente, jih lahko sestavite v objekt Cevovodi. Ni vam treba določiti vrstnega reda cevovoda, ker Cevovodi samodejno sklepajo zaporedje vrstnega reda na podlagi odvisnosti med koraki.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Zaženite naslednjo kodo v celici svojega zvezka. Če cevovod že obstaja, ga koda posodobi. Če cevovod ne obstaja, ustvari novega.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

zaključek

V tej objavi smo predstavili nekaj novih funkcij, ki so zdaj na voljo s storitvijo Pipelines, skupaj z drugimi vgrajenimi funkcijami SageMaker in algoritmom XGBoost za razvoj, ponavljanje in uvajanje modela za napovedovanje opuščanja. Rešitev je mogoče razširiti z dodatnimi viri podatkov

za implementacijo lastnega delovnega procesa ML. Za več podrobnosti o korakih, ki so na voljo v poteku dela Cevovodi, glejte Amazon SageMaker Model Building Pipeline in Poteki dela SageMaker. Primeri AWS SageMaker GitHub repo ima več primerov različnih primerov uporabe z uporabo cevovodov.


O avtorjih

Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Navpično iskanje. Ai.Jerry Peng je inženir za razvoj programske opreme pri AWS SageMaker. Osredotoča se na izgradnjo obsežnega sistema MLOps od konca do konca od usposabljanja do spremljanja modela v proizvodnji. Prav tako se strastno trudi predstaviti koncept MLOps širšemu občinstvu.

Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Navpično iskanje. Ai.Dewen Qi je inženir za razvoj programske opreme v AWS. Trenutno se osredotoča na razvoj in izboljšanje cevovodov SageMaker. Zunaj službe rada vadi violončelo.

Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Navpično iskanje. Ai.Gayatri Ghanakota je višji inženir strojnega učenja pri AWS Professional Services. Navdušena je nad razvojem, uvajanjem in razlago rešitev AI/ML na različnih področjih. Pred to vlogo je vodila številne pobude kot podatkovna znanstvenica in inženirka ML z vrhunskimi svetovnimi podjetji v finančnem in maloprodajnem prostoru. Ima magisterij iz računalništva, specializiran za podatkovno znanost, na univerzi Colorado, Boulder.

Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Navpično iskanje. Ai.Rupinder Grewal je Sr Ai/ML Specialist Solutions Architect pri AWS. Trenutno se osredotoča na streženje modelov in MLO na SageMakerju. Pred to vlogo je delal kot inženir strojnega učenja za gradnjo in gostovanje modelov. Izven službe rad igra tenis in kolesari po gorskih poteh.

Nove funkcije za Amazon SageMaker Pipelines in Amazon SageMaker SDK PlatoBlockchain Data Intelligence. Navpično iskanje. Ai.Ray Li je višji podatkovni znanstvenik pri AWS Professional Services. Njegova posebnost se osredotoča na gradnjo in operacionalizacijo rešitev AI/ML za stranke različnih velikosti, od startupov do podjetniških organizacij. Poleg službe Ray uživa v fitnesu in potovanjih.

Časovni žig:

Več od Strojno učenje AWS