Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle PlatoBlockchain Data Intelligencelle. Pystysuuntainen haku. Ai.

Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle

Amazon SageMaker -putkistot antaa datatieteilijöille ja koneoppimisen (ML) insinööreille mahdollisuuden automatisoida koulutustyönkulkuja, mikä auttaa sinua luomaan toistettavan prosessin mallinkehitysvaiheiden organisoimiseksi nopeaa kokeilua ja mallien uudelleenkoulutusta varten. Voit automatisoida koko mallin rakentamisen työnkulun, mukaan lukien tietojen valmistelun, ominaisuuksien suunnittelun, mallikoulutuksen, mallin virityksen ja mallin validoinnin, ja luetteloida sen mallirekisteriin. Voit määrittää liukuhihnat toimimaan automaattisesti säännöllisin väliajoin tai kun tietyt tapahtumat käynnistyvät, tai voit suorittaa ne manuaalisesti tarpeen mukaan.

Tässä viestissä korostamme joitain parannuksia Amazon Sage Maker SDK ja esittelevät Amazon SageMaker Pipelinesin uusia ominaisuuksia, jotka helpottavat ML-harjoittajien ML-mallien rakentamista ja kouluttamista.

Pipelines jatkaa kehittäjäkokemuksensa uudistamista, ja näiden viimeaikaisten julkaisujen ansiosta voit nyt käyttää palvelua entistä räätälöidymmin:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Päivitetty dokumentaatio PipelineVariable käyttö estimaattorille, prosessorille, virittimelle, muuntajalle ja mallin perusluokille, Amazon-malleille ja kehysmalleille. SDK:n uudempiin versioihin tulee lisää muutoksia, jotka tukevat kaikkia arvioijien ja prosessorien alaluokkia.
  • 2.90.0 – Saatavuus ModelStep integroitujen malliresurssien luonti- ja rekisteröintitehtäviin.
  • 2.88.2 – Saatavuus PipelineSession hallittua vuorovaikutusta SageMaker-yksiköiden ja -resurssien kanssa.
  • 2.88.2 – Alaluokkayhteensopivuus työnkulun putkilinjan työn vaiheet joten voit rakentaa töiden abstraktioita ja määrittää ja suorittaa prosessointi-, koulutus-, muunnos- ja viritystöitä aivan kuten ilman putkia.
  • 2.76.0 – Saatavuus FailStep pysäyttää ehdollisesti viallisen putkilinjan.

Tässä viestissä opastamme sinut työnkulun läpi käyttämällä esimerkkitietojoukkoa, jossa keskitytään mallin rakentamiseen ja käyttöönottoon, jotta voidaan näyttää, kuinka Pipelinesin uudet ominaisuudet otetaan käyttöön. Loppujen lopuksi sinulla pitäisi olla tarpeeksi tietoa, jotta voit käyttää näitä uudempia ominaisuuksia ja yksinkertaistaa ML-työkuormia.

Ominaisuuksien yleiskatsaus

Pipelines tarjoaa seuraavat uudet ominaisuudet:

  • Pipeline-muuttujan huomautus – Tietyt menetelmäparametrit hyväksyvät useita syöttötyyppejä, mukaan lukien PipelineVariables, ja lisäasiakirjoja on lisätty selventämään missä PipelineVariables ovat tuettuja sekä SageMaker SDK:n uusimmassa vakaassa versiossa että funktioiden aloitusallekirjoituksessa. Esimerkiksi seuraavassa TensorFlow-estimaattorissa init-allekirjoitus näyttää nyt sen model_dir ja image_uri tuki PipelineVariables, kun taas muut parametrit eivät. Lisätietoja on kohdassa TensorFlow estimaattori.
    • Ennen:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Jälkeen:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Putkilinjan istunto - PipelineSession on uusi konsepti, joka esiteltiin yhtenäistämään SageMaker SDK:ta ja esittelee liukuhihnaresurssien laiskan alustuksen (ajokutsut kaapataan, mutta niitä ei suoriteta ennen kuin liukuhihna on luotu ja suoritettu). The PipelineSession konteksti perii SageMakerSession ja ottaa käyttöön käteviä menetelmiä, joiden avulla voit olla vuorovaikutuksessa muiden SageMaker-kokonaisuuksien ja -resurssien kanssa, kuten koulutustöitä, päätepisteitä ja syöttötietojoukkoja, jotka on tallennettu Amazonin yksinkertainen tallennuspalvelu (Amazon S3).
  • Alaluokan yhteensopivuus työnkulun putkilinjan työn vaiheiden kanssa – Voit nyt rakentaa töiden abstraktioita ja konfiguroida ja suorittaa prosessointi-, koulutus-, muunnos- ja viritystöitä aivan kuten ilman putkistoa.
    • Esimerkiksi luomalla käsittelyvaiheen kanssa SKLearnProcessor vaadittiin aiemmin seuraavaa:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Kuten näemme edellisestä koodista, ProcessingStep täytyy tehdä periaatteessa sama esikäsittelylogiikka kuin .run, vain aloittamatta API-kutsua työn aloittamiseksi. Mutta kun alaluokan yhteensopivuus on nyt otettu käyttöön työnkulun putkilinjan työn vaiheilla, julistamme step_args argumentti, joka ottaa esikäsittelylogiikan .run:n kanssa, jotta voit rakentaa työn abstraktion ja määrittää sen samalla tavalla kuin käyttäisit sitä ilman putkia. Kuljemme myös sisään pipeline_session, Joka on PipelineSession objekti sen sijaan sagemaker_session varmistaaksesi, että ajokutsut kaapataan, mutta niitä ei kutsuta ennen kuin liukuhihna on luotu ja suoritettu. Katso seuraava koodi:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Mallivaihe (virtaviivainen lähestymistapa mallin luonti- ja rekisteröintivaiheineen) -Pipelines tarjoaa kaksi vaihetyyppiä integroitaviksi SageMaker-malleihin: CreateModelStep ja RegisterModel. Voit nyt saavuttaa molemmat käyttämällä vain ModelStep tyyppi. Huomaa, että a PipelineSession tarvitaan tämän saavuttamiseksi. Tämä tuo samankaltaisuuden liukuhihnavaiheiden ja SDK:n välille.
    • Ennen:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Jälkeen:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Epäonnistunut vaihe (putkilinjan ajon ehdollinen pysäytys) - FailStep sallii putkilinjan pysäyttämisen vikatilassa, jos ehto täyttyy, kuten jos mallin pistemäärä on alle tietyn kynnyksen.

Ratkaisun yleiskatsaus

Tässä ratkaisussa aloituspisteesi on Amazon SageMaker Studio integroitu kehitysympäristö (IDE) nopeaa kokeilua varten. Studio tarjoaa ympäristön päästä päähän Pipelines-kokemuksen hallintaan. Studion avulla voit ohittaa AWS-hallintakonsoli koko työnkulun hallintaan. Lisätietoja putkien hallinnasta Studiossa on kohdassa Tarkastele, seuraa ja suorita SageMaker-putkistoja SageMaker Studiossa.

Seuraava kaavio havainnollistaa ML-työnkulun korkean tason arkkitehtuuria eri vaiheilla kouluttaa ja luoda päätelmiä käyttämällä uusia ominaisuuksia.

Putkilinja sisältää seuraavat vaiheet:

  1. Esikäsittele tiedot tarvittavien ominaisuuksien rakentamiseksi ja jaa tiedot juna-, validointi- ja testitietosarjoiksi.
  2. Luo koulutustyö SageMaker XGBoost -kehyksen avulla.
  3. Arvioi koulutettu malli testitietojoukon avulla.
  4. Tarkista, onko AUC-pistemäärä ennalta määritellyn kynnyksen yläpuolella.
    • Jos AUC-pistemäärä on pienempi kuin kynnys, pysäytä liukuhihnan ajo ja merkitse se epäonnistuneeksi.
    • Jos AUC-pistemäärä on suurempi kuin kynnys, luo SageMaker-malli ja rekisteröi se SageMaker-mallirekisteriin.
  5. Käytä erämuutosta annetussa tietojoukossa käyttämällä edellisessä vaiheessa luotua mallia.

Edellytykset

Jotta voit seurata tätä viestiä, tarvitset AWS-tilin a Studion verkkotunnus.

Pipelines on integroitu suoraan SageMaker-kokonaisuuksiin ja -resursseihin, joten sinun ei tarvitse olla vuorovaikutuksessa muiden AWS-palvelujen kanssa. Sinun ei myöskään tarvitse hallita resursseja, koska se on täysin hallittu palvelu, mikä tarkoittaa, että se luo ja hallitsee resursseja puolestasi. Lisätietoja eri SageMaker-komponenteista, jotka ovat sekä itsenäisiä Python-sovellusliittymiä että Studion integroituja komponentteja, on SageMaker tuotesivu.

Ennen kuin aloitat, asenna SageMaker SDK -versio >= 2.104.0 ja xlrd >=1.0.0 Studio-muistikirjaan käyttämällä seuraavaa koodinpätkää:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML-työnkulku

Tässä viestissä käytät seuraavia komponentteja:

  • Tietojen valmistelu
    • SageMaker-käsittely – SageMaker Processing on täysin hallittu palvelu, jonka avulla voit suorittaa mukautettuja datamuunnoksia ja ominaisuussuunnittelua ML-työkuormille.
  • Mallirakennus
  • Mallin koulutus ja arviointi
    • Harjoittelu yhdellä napsautuksella – SageMakerin hajautettu harjoitusominaisuus. SageMaker tarjoaa hajautettuja koulutuskirjastoja tietojen rinnakkaisuudelle ja mallin rinnakkaisuudelle. Kirjastot on optimoitu SageMaker-harjoitusympäristöä varten, ne auttavat mukauttamaan hajautettuja harjoitustyösi SageMakeriin ja parantavat harjoitusnopeutta ja suoritustehoa.
    • SageMaker-kokeet – Experiments on SageMakerin ominaisuus, jonka avulla voit järjestää, seurata, vertailla ja arvioida ML-iteraatioita.
    • SageMaker-erämuunnos – Erämuunnos tai offline-pisteytys on hallittu palvelu SageMakerissa, jonka avulla voit ennustaa suuremman tietojoukon ML-mallejasi käyttämällä.
  • Työnkulun orkestrointi

SageMaker-liukuhihna on sarja toisiinsa yhdistettyjä vaiheita, jotka on määritelty JSON-liukuhihnamääritelmässä. Se koodaa liukuhihnan käyttämällä suunnattua asyklistä kuvaajaa (DAG). DAG antaa tietoa liukuhihnan kunkin vaiheen vaatimuksista ja suhteista, ja sen rakenteen määräävät vaiheiden väliset datariippuvuudet. Nämä riippuvuudet luodaan, kun vaiheen lähdön ominaisuudet välitetään syötteenä toiseen vaiheeseen.

Seuraava kaavio havainnollistaa SageMaker-liukuhihnan eri vaiheita (vaihtumisennusteen käyttötapauksessa), joissa SageMaker päättelee vaiheiden väliset yhteydet vaihemääritelmien määrittämien tulojen ja lähtöjen perusteella.

Seuraavissa osissa käydään läpi putkilinjan kunkin vaiheen luominen ja koko putkilinjan käyttäminen luomisen jälkeen.

Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle PlatoBlockchain Data Intelligencelle. Pystysuuntainen haku. Ai.

Hankkeen rakenne

Aloitetaan projektin rakenteesta:

  • /sm-pipelines-end-to-end-example – Projektin nimi
    • / data – Tietojoukot
    • /putket – Liukuhihnakomponenttien kooditiedostot
      • /customerchurn
        • preprocess.py
        • arvioida.py
    • sagemaker-pipelines-project.ipynb – Muistikirja, joka käy läpi mallinnuksen työnkulun Pipelinesin uusien ominaisuuksien avulla

Lataa tietojoukko

Jotta voit seurata tätä viestiä, sinun on ladattava ja tallennettava näytetiedosto datakansion alla projektin kotihakemistossa, johon tiedosto tallennetaan Amazonin elastinen tiedostojärjestelmä (Amazon EFS) Studio-ympäristössä.

Rakenna putkilinjan komponentit

Nyt olet valmis rakentamaan putkilinjan komponentit.

Tuo lausekkeita ja ilmoittaa parametrit ja vakiot

Luo Studio-muistikirja nimeltä sagemaker-pipelines-project.ipynb projektin kotihakemistossa. Kirjoita seuraava koodilohko soluun ja suorita solu määrittääksesi SageMaker- ja S3-asiakasobjektit, luo PipelineSessionja määritä S3-säilön sijainti käyttämällä SageMaker-istunnon mukana tulevaa oletussäilöä:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Liukulinjat tukevat parametrointia, jonka avulla voit määrittää syöttöparametreja ajon aikana muuttamatta liukuhihnakoodia. Voit käyttää alla olevia moduuleja sagemaker.workflow.parameters moduuli, kuten ParameterInteger, ParameterFloatja ParameterString, määrittää eri tietotyyppien liukuhihnaparametreja. Suorita seuraava koodi asettaaksesi useita syöttöparametreja:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Luo erätietojoukko

Luo erätietojoukko, jota käytät myöhemmin erämuunnosvaiheessa:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Lataa tiedot S3-ämpäriin

Lataa tietojoukot Amazon S3:een:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Määritä käsittelyskripti ja käsittelyvaihe

Tässä vaiheessa valmistelet Python-komentosarjan ominaisuuksien suunnittelua, yhtä kuumakoodausta varten ja kurkkaat mallin rakentamiseen käytettävät koulutus-, validointi- ja testijaot. Suorita seuraava koodi luodaksesi käsittelyskripti:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Suorita seuraavaksi seuraava koodilohko prosessorin ilmentämiseksi ja Liukulinjat-vaihe suorittaaksesi käsittelykomentosarjan. Koska käsittelyskripti on kirjoitettu Pandas-kielellä, käytät a SKLearnProcessor. Putket ProcessingStep funktio ottaa seuraavat argumentit: prosessori, syötteen S3 sijainnit raakatietojoukoille ja ulostulon S3 sijainnit käsiteltyjen tietojoukkojen tallentamiseksi.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Määrittele harjoitusvaihe

Määritä mallikoulutus käyttämällä SageMaker XGBoost -estimaattoria ja putkia TrainingStep toiminto:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Määritä arviointiskripti ja mallin arviointivaihe

Suorita seuraava koodilohko arvioidaksesi mallin koulutuksen jälkeen. Tämä skripti kapseloi logiikan sen tarkistamiseksi, täyttääkö AUC-pisteet määritetyn kynnyksen.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Suorita seuraavaksi seuraava koodilohko prosessorin ilmentämiseksi ja Liukulinjat-vaihe suorittaaksesi arviointikomentosarjan. Koska arviointikomentosarja käyttää XGBoost-pakettia, käytät a ScriptProcessor yhdessä XGBoost-kuvan kanssa. Putket ProcessingStep funktio ottaa seuraavat argumentit: prosessori, syötteen S3 sijainnit raakatietojoukoille ja ulostulon S3 sijainnit käsiteltyjen tietojoukkojen tallentamiseksi.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Määritä mallin luontivaihe

Suorita seuraava koodilohko luodaksesi SageMaker-mallin käyttämällä Pipelines-mallivaihetta. Tämä vaihe käyttää koulutusvaiheen tulosta mallin pakkaamiseen käyttöönottoa varten. Huomaa, että ilmentymän tyyppiargumentin arvo välitetään käyttämällä aiemmin viestissä määrittämääsi Pipelines-parametria.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Määritä erämuunnosvaihe

Suorita seuraava koodilohko suorittaaksesi erämuunnos harjoitetun mallin avulla ensimmäisessä vaiheessa luodulla eräsyötteellä:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Määritä rekisterimallin vaihe

Seuraava koodi rekisteröi mallin SageMaker-mallirekisteriin käyttämällä Pipelines-mallivaihetta:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Määritä epäonnistumisvaihe putkilinjan pysäyttämiseksi

Seuraava koodi määrittää Liukulinjat epäonnistuvat -vaiheen putkilinjan ajon pysäyttämiseksi virheilmoituksella, jos AUC-pisteet eivät täytä määritettyä kynnysarvoa:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Määritä ehtovaihe tarkistaaksesi AUC-pisteet

Seuraava koodi määrittää ehtovaiheen, jolla tarkistetaan AUC-pisteet ja luodaan ehdollisesti malli ja suoritetaan erämuunnos ja rekisteröidään malli mallirekisteriin tai pysäytetään liukuhihnan ajo epäonnistuneessa tilassa:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Rakenna ja käytä putkistoa

Kun olet määrittänyt kaikki komponenttivaiheet, voit koota ne Pipelines-objektiksi. Liukuhihnan järjestystä ei tarvitse määrittää, koska Pipelines päättelee järjestysjärjestyksen automaattisesti vaiheiden välisten riippuvuuksien perusteella.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Suorita seuraava koodi muistikirjan soluun. Jos liukuhihna on jo olemassa, koodi päivittää liukuhihnan. Jos putkistoa ei ole olemassa, se luo uuden.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Yhteenveto

Tässä viestissä esittelimme joitain uusia ominaisuuksia, jotka ovat nyt saatavilla Pipelinesissä, sekä muita sisäänrakennettuja SageMaker-ominaisuuksia ja XGBoost-algoritmia, joiden avulla voit kehittää, iteroida ja ottaa käyttöön vaihtuvuuden ennustamisen mallin. Ratkaisua voidaan laajentaa lisätietolähteillä

toteuttaaksesi oman ML-työnkulkusi. Lisätietoja Pipelines-työnkulun vaiheista on kohdassa Amazon SageMaker -mallinrakennusputki ja SageMaker-työnkulut. AWS SageMaker -esimerkkejä GitHub-repolla on enemmän esimerkkejä erilaisista Pipelines-käyttötapauksista.


Tietoja Tekijät

Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle PlatoBlockchain Data Intelligencelle. Pystysuuntainen haku. Ai.Jerry PengLisää on AWS SageMakerin ohjelmistokehitysinsinööri. Hän keskittyy kokonaisvaltaisen laajamittaisen MLOps-järjestelmän rakentamiseen koulutuksesta tuotannon malliseurantaan. Hän on myös intohimoinen tuomaan MLOps-konseptin laajemmalle yleisölle.

Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle PlatoBlockchain Data Intelligencelle. Pystysuuntainen haku. Ai.Dewen Qi on ohjelmistokehitysinsinööri AWS:ssä. Hän keskittyy tällä hetkellä SageMaker-putkien kehittämiseen ja parantamiseen. Työn ulkopuolella hän nauttii sellon harjoittelusta.

Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle PlatoBlockchain Data Intelligencelle. Pystysuuntainen haku. Ai.Gayatri Ghanakota on vanhempi koneoppimisinsinööri, jolla on AWS-asiantuntijapalvelut. Hän on intohimoinen AI/ML-ratkaisujen kehittämiseen, käyttöönottoon ja selittämiseen eri aloilla. Ennen tätä roolia hän johti useita aloitteita tietotutkijana ja ML-insinöörinä rahoitus- ja vähittäiskaupan alan parhaiden globaalien yritysten kanssa. Hänellä on tietojenkäsittelytieteen maisterin tutkinto, joka on erikoistunut tietotieteeseen Coloradon yliopistosta Boulderista.

Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle PlatoBlockchain Data Intelligencelle. Pystysuuntainen haku. Ai.Rupinder Grewal on AWS:n Sr Ai/ML Specialist Solutions -arkkitehti. Tällä hetkellä hän keskittyy mallien ja MLO:iden tarjoamiseen SageMakerissa. Ennen tätä roolia hän on työskennellyt koneoppimisinsinöörinä mallien rakentamisessa ja isännöinnissa. Työn ulkopuolella hän pelaa tennistä ja pyöräilee vuoristopoluilla.

Uusia ominaisuuksia Amazon SageMaker Pipelinesille ja Amazon SageMaker SDK:lle PlatoBlockchain Data Intelligencelle. Pystysuuntainen haku. Ai.Ray Li on vanhempi datatutkija, jolla on AWS-asiantuntijapalvelut. Hänen erikoisalansa keskittyy AI/ML-ratkaisujen rakentamiseen ja operoimiseen erikokoisille asiakkaille startupeista yritysorganisaatioihin. Työn ulkopuolella Ray nauttii kuntoilusta ja matkustamisesta.

Aikaleima:

Lisää aiheesta AWS-koneoppiminen