Uued funktsioonid Amazon SageMaker Pipelines ja Amazon SageMaker SDK PlatoBlockchain Data Intelligence jaoks. Vertikaalne otsing. Ai.

Amazon SageMaker Pipelinesi ja Amazon SageMaker SDK uued funktsioonid

Amazon SageMakeri torujuhtmed võimaldab andmeteadlastel ja masinõppe (ML) inseneridel koolituse töövooge automatiseerida, mis aitab teil luua korratavat protsessi, et korraldada mudeli arendamise etappe kiireks katsetamiseks ja mudelite ümberõppeks. Saate automatiseerida kogu mudeli koostamise töövoo, sealhulgas andmete ettevalmistamise, funktsioonide projekteerimise, mudeli koolituse, mudeli häälestamise ja mudeli valideerimise ning kataloogida selle mudeliregistris. Saate konfigureerida konveierid jooksma automaatselt regulaarsete ajavahemike järel või teatud sündmuste käivitumisel, või saate neid vajaduse korral käsitsi käivitada.

Selles postituses toome esile mõned täiustused Amazon SageMaker SDK ja tutvustavad Amazon SageMaker Pipelines'i uusi funktsioone, mis muudavad ML-i praktikute jaoks lihtsamaks ML-mudelite loomise ja koolitamise.

Pipelines jätkab oma arendajakogemuse uuendamist ja nende hiljutiste väljalasetega saate nüüd teenust kohandatud viisil kasutada.

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Uuendatud dokumentatsioon PipelineVariable hindaja, protsessori, tuuneri, trafo ja mudeli baasklasside, Amazoni mudelite ja raamistiku mudelite kasutamine. SDK uuemate versioonidega tehakse täiendavaid muudatusi, et toetada kõiki hindajate ja protsessorite alamklasse.
  • 2.90.0 – Saadavus ModelStep integreeritud mudeliressursi loomise ja registreerimise ülesannete jaoks.
  • 2.88.2 – Saadavus PipelineSession hallatud suhtlemiseks SageMakeri üksuste ja ressurssidega.
  • 2.88.2 - alamklassi ühilduvus töövoo torujuhtme tööetapid nii saate luua töö abstraktsioone ning konfigureerida ja käivitada töötlemis-, koolitus-, teisendus- ja häälestustöid nagu ilma konveierita.
  • 2.76.0 – Saadavus FailStep rikkeseisundiga torujuhtme tingimuslikuks peatamiseks.

Selles postituses tutvustame teile töövoogu, kasutades näidisandmestikku, keskendudes mudeli loomisele ja juurutamisele, et näidata, kuidas Pipelinesi uusi funktsioone rakendada. Lõpuks peaks teil olema piisavalt teavet, et neid uuemaid funktsioone edukalt kasutada ja ML-i töökoormust lihtsustada.

Funktsioonide ülevaade

Pipelines pakub järgmisi uusi funktsioone:

  • Torujuhtme muutuja annotatsioon – Teatud meetodi parameetrid aktsepteerivad mitut sisenditüüpi, sealhulgas PipelineVariables, ja on lisatud täiendavaid dokumente, et selgitada, kus PipelineVariables on toetatud nii SageMaker SDK dokumentatsiooni uusimas stabiilses versioonis kui ka funktsioonide algsignatuuris. Näiteks järgmises TensorFlow hinnangus näitab seda nüüd algsignatuur model_dir ja image_uri toetama PipelineVariables, samas kui teised parameetrid mitte. Lisateabe saamiseks vaadake TensorFlow hindaja.
    • Enne:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Pärast:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Torujuhtme seanss - PipelineSession on uus kontseptsioon, mis on kasutusele võetud SageMaker SDK ühtsuse toomiseks ja tutvustab konveieri ressursside laisakat lähtestamist (käivituskutsed püütakse kinni, kuid neid ei käivitata enne, kui konveier on loodud ja käivitatud). The PipelineSession kontekst pärib SageMakerSession ja rakendab mugavaid meetodeid teiste SageMakeri olemite ja ressurssidega suhtlemiseks, nagu koolitustööd, lõpp-punktid ja sisendandmed, mis on salvestatud Amazoni lihtne salvestusteenus (Amazon S3).
  • Alamklassi ühilduvus töövoo konveieri tööetappidega – Nüüd saate koostada tööde abstraktsioone ning konfigureerida ja käivitada töötlemis-, koolitus-, teisendus- ja häälestustöid nagu ilma konveierita.
    • Näiteks töötlemisetapi loomine rakendusega SKLearnProcessor varem nõudis järgmist:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Nagu näeme eelmises koodis, ProcessingStep peab tegema põhimõtteliselt sama eeltöötlusloogikat nagu .run, lihtsalt ilma töö alustamiseks API-kutset käivitamata. Kuid kuna alamklassi ühilduvus on nüüd töövoo konveieri tööetappidega lubatud, deklareerime step_args argument, mis kasutab .run eeltöötlusloogikat, et saaksite luua töö abstraktsiooni ja konfigureerida seda nii, nagu kasutaksite seda ilma torujuhtmeteta. Läbime ka pipeline_session, mis on a PipelineSession objekti asemel sagemaker_session veendumaks, et käitamiskutsed püütakse kinni, kuid neid ei kutsuta enne, kui konveier on loodud ja käivitatud. Vaadake järgmist koodi:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Mudelite samm (lihtne lähenemisviis mudeli loomise ja registreerimise sammudega) -Pipelines pakub SageMakeri mudelitega integreerimiseks kahte etapitüüpi: CreateModelStep ja RegisterModel. Nüüd saate mõlemat saavutada, kasutades ainult ModelStep tüüp. Pange tähele, et a PipelineSession selle saavutamiseks on vaja. See toob konveieri etappide ja SDK sarnasuse.
    • Enne:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Pärast:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Ebaõnnestunud samm (torujuhtme töö tingimuslik seiskamine) - FailStep võimaldab konveieri peatada rikkeolekuga, kui tingimus on täidetud, näiteks kui mudeli skoor on alla teatud läve.

Lahenduse ülevaade

Selles lahenduses on teie sisenemispunkt Amazon SageMaker Studio integreeritud arenduskeskkond (IDE) kiireks katsetamiseks. Studio pakub keskkonda torujuhtmete täieliku kasutuskogemuse haldamiseks. Studio abil saate sellest mööda minna AWS-i juhtimiskonsool kogu töövoo haldamiseks. Lisateavet torujuhtmete haldamise kohta Studios leiate aadressilt Vaadake, jälgige ja käivitage SageMakeri torujuhtmeid SageMaker Studios.

Järgmine diagramm illustreerib ML-i töövoo kõrgetasemelist arhitektuuri koos uute funktsioonide abil treenimise ja järelduste tegemise erinevate sammudega.

Torujuhe sisaldab järgmisi samme:

  1. Eeltöötlege andmeid vajalike funktsioonide loomiseks ja jagage andmed rongi-, valideerimis- ja katseandmekogumiteks.
  2. Looge SageMaker XGBoost raamistikuga koolitustöö.
  3. Hinnake koolitatud mudelit testiandmestiku abil.
  4. Kontrollige, kas AUC skoor ületab etteantud läve.
    • Kui AUC skoor on künnisest väiksem, peatage torujuhtme töö ja märkige see ebaõnnestunuks.
    • Kui AUC skoor on suurem kui lävi, looge SageMakeri mudel ja registreerige see SageMakeri mudeliregistris.
  5. Rakendage antud andmekogumile pakkteisendus, kasutades eelmises etapis loodud mudelit.

Eeldused

Selle postituse jätkamiseks vajate AWS-i kontot a Stuudio domeen.

Torujuhtmed on integreeritud otse SageMakeri üksuste ja ressurssidega, nii et te ei pea suhtlema muude AWS-teenustega. Samuti ei pea te ressursse haldama, kuna see on täielikult hallatav teenus, mis tähendab, et see loob ja haldab teie eest ressursse. Lisateavet erinevate SageMakeri komponentide kohta, mis on nii iseseisvad Pythoni API-d kui ka Studio integreeritud komponentid, leiate SageMakeri tooteleht.

Enne alustamist installige Studio märkmikusse SageMaker SDK versioon >= 2.104.0 ja xlrd >=1.0.0, kasutades järgmist koodilõiku:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

ML töövoog

Selle postituse jaoks kasutate järgmisi komponente:

  • Andmete ettevalmistamine
    • SageMakeri töötlemine – SageMaker Processing on täielikult hallatav teenus, mis võimaldab teil käitada kohandatud andmete teisendusi ja funktsioonide projekteerimist ML-töökoormuste jaoks.
  • Mudeli ehitamine
  • Mudelite koolitus ja hindamine
    • Treening ühe klõpsuga – SageMakeri levitatud treeningfunktsioon. SageMaker pakub hajutatud õppeteeke andmete paralleelsuse ja mudelite paralleelsuse jaoks. Teegid on optimeeritud SageMakeri koolituskeskkonna jaoks, aitavad kohandada teie hajutatud treeningtöid SageMakeriga ning parandavad treeningu kiirust ja läbilaskevõimet.
    • SageMakeri katsed – Eksperimendid on SageMakeri võimalus, mis võimaldab teil korraldada, jälgida, võrrelda ja hinnata oma ML-iteratsioone.
    • SageMakeri partii teisendus – Partii teisendus või võrguühenduseta hindamine on SageMakeris hallatav teenus, mis võimaldab teil oma ML-mudeleid kasutades ennustada suuremat andmestikku.
  • Töövoo orkestreerimine

SageMakeri konveier on JSON-i konveieri määratlusega määratletud omavahel ühendatud sammude jada. See kodeerib konveieri, kasutades suunatud atsüklilist graafikut (DAG). DAG annab teavet konveieri iga etapi nõuete ja suhete kohta ning selle struktuuri määravad sammudevahelised andmete sõltuvused. Need sõltuvused luuakse siis, kui sammu väljundi atribuudid edastatakse sisendina teisele sammule.

Järgnev diagramm illustreerib erinevaid samme SageMakeri konveieris (kärbimise ennustamise kasutusjuhul), kus sammudevahelised seosed järeldab SageMaker sammude määratlustega määratletud sisendite ja väljundite põhjal.

Järgmistes jaotistes kirjeldatakse torujuhtme iga etapi loomist ja kogu torujuhtme käivitamist.

Uued funktsioonid Amazon SageMaker Pipelines ja Amazon SageMaker SDK PlatoBlockchain Data Intelligence jaoks. Vertikaalne otsing. Ai.

Projekti ülesehitus

Alustame projekti struktuurist:

  • /sm-pipelines-end-to-end-example – projekti nimi
    • / andmed – Andmekogumid
    • /torujuhtmed – torujuhtme komponentide koodifailid
      • /kliendisuud
        • preprocess.py
        • hindama.py
    • sagemaker-pipelines-project.ipynb – Pipelinesi uute funktsioonide abil modelleerimise töövoo läbiv märkmik

Laadige alla andmestik

Selle postituse jätkamiseks peate alla laadima ja salvestama näidisandmekogum projekti kodukataloogi andmekausta all, kuhu fail salvestatakse Amazon elastne failisüsteem (Amazon EFS) Studio keskkonnas.

Ehitage torujuhtme komponendid

Nüüd olete valmis torujuhtme komponente ehitama.

Impordi laused ning deklareeri parameetrid ja konstandid

Looge Studio märkmik nimega sagemaker-pipelines-project.ipynb projekti kodukataloogis. Sisestage lahtrisse järgmine koodiplokk ja käivitage lahter, et seadistada SageMakeri ja S3 kliendiobjektid, luua PipelineSessionja seadistage S3 ämbri asukoht, kasutades SageMakeri seansiga kaasas olevat vaikeämbrit:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

Konveierid toetavad parameetrite määramist, mis võimaldab teil määrata sisendparameetrid käitusajal ilma konveieri koodi muutmata. Saate kasutada allolevaid mooduleid sagemaker.workflow.parameters moodul, näiteks ParameterInteger, ParameterFloatja ParameterString, et määrata erinevate andmetüüpide konveieri parameetreid. Mitme sisendparameetri seadistamiseks käivitage järgmine kood:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Looge pakettandmekogum

Looge pakettandmekomplekt, mida kasutate hiljem partii teisendamise etapis.

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Laadige andmed üles S3 ämbrisse

Laadige andmestikud Amazon S3 üles:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Määratlege töötlemise skript ja töötlemise etapp

Selles etapis valmistate ette Pythoni skripti funktsioonide projekteerimiseks, ühe kuumkodeeringu tegemiseks ning mudelite ehitamiseks kasutatava koolituse, valideerimise ja testijaotiste kureerimiseks. Käivitage töötlemisskripti koostamiseks järgmine kood:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Järgmisena käivitage protsessori käivitamiseks järgmine koodiplokk ja töötlemisskripti käivitamiseks samm Pipelines. Kuna töötlemisskript on kirjutatud Pandas, kasutate a SKLearnProcessor. Torujuhtmed ProcessingStep Funktsioon kasutab järgmisi argumente: protsessor, sisend-S3 asukohad töötlemata andmekogumite jaoks ja väljund S3 asukohad töödeldud andmekogumite salvestamiseks.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Määrake koolitusetapp

Seadistage mudelikoolitus, kasutades SageMaker XGBoost hindajat ja torujuhtmeid TrainingStep funktsioon:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Määratlege hindamisskript ja mudeli hindamise samm

Käivitage järgmine koodiplokk, et hinnata mudelit pärast koolitamist. See skript sisaldab loogikat, et kontrollida, kas AUC skoor vastab määratud lävele.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Järgmisena käivitage protsessori käivitamiseks järgmine koodiplokk ja hindamisskripti käivitamiseks samm Pipelines. Kuna hindamisskript kasutab paketti XGBoost, kasutate a ScriptProcessor koos XGBoosti pildiga. Torujuhtmed ProcessingStep Funktsioon kasutab järgmisi argumente: protsessor, sisend-S3 asukohad töötlemata andmekogumite jaoks ja väljund S3 asukohad töödeldud andmekogumite salvestamiseks.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Määrake mudeli loomise samm

Käivitage järgmine koodiplokk, et luua SageMakeri mudel, kasutades torujuhtmete mudeli sammu. See samm kasutab väljaõppeetapi väljundit mudeli juurutamiseks pakendamiseks. Pange tähele, et eksemplari tüübi argumendi väärtus edastatakse varem postituses määratletud parameetri Pipelines abil.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Määrake partii teisenduse samm

Käivitage järgmine koodiplokk, et käivitada partii teisendus, kasutades koolitatud mudelit esimeses etapis loodud partii sisendiga:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Määratlege registrimudeli samm

Järgmine kood registreerib mudeli SageMakeri mudeliregistris, kasutades Pipelines mudeli sammu:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

Määrake torujuhtme peatamiseks ebaõnnestumise samm

Järgmine kood määratleb torujuhtmete ebaõnnestumise sammu, et peatada konveieri käitamine veateate saatel, kui AUC skoor ei vasta määratletud lävele.

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

Määratlege AUC skoori kontrollimise tingimus

Järgmine kood määratleb tingimuse sammu AUC skoori kontrollimiseks ja tingimusliku mudeli loomiseks ja partii teisenduse käivitamiseks ja mudeli registreerimiseks mudeliregistris või konveieri töö peatamiseks ebaõnnestunud olekus:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

Ehitage ja käivitage torujuhe

Pärast kõigi komponentide etappide määratlemist saate need koguda torujuhtmete objektiks. Te ei pea määrama konveieri järjekorda, kuna torujuhtmed järeldab automaatselt järjestuse jada sammude vaheliste sõltuvuste põhjal.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Käivitage märkmiku lahtris järgmine kood. Kui konveier on juba olemas, värskendab kood konveieri. Kui torujuhet pole, loob see uue.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Järeldus

Selles postituses tutvustasime mõningaid uusi funktsioone, mis on nüüd saadaval koos torujuhtmetega, koos muude sisseehitatud SageMakeri funktsioonide ja XGBoosti algoritmiga, et arendada, itereerida ja juurutada lünga ennustamise mudelit. Lahendust saab täiendada täiendavate andmeallikatega

oma ML-i töövoo rakendamiseks. Lisateavet torujuhtmete töövoos saadaolevate sammude kohta leiate artiklist Amazon SageMakeri mudeli ehitamise torujuhe ja SageMakeri töövood. AWS SageMakeri näited GitHubi repol on rohkem näiteid torujuhtmete kasutamise erinevate kasutusjuhtude kohta.


Autoritest

Uued funktsioonid Amazon SageMaker Pipelines ja Amazon SageMaker SDK PlatoBlockchain Data Intelligence jaoks. Vertikaalne otsing. Ai.Jerry Peng on tarkvaraarenduse insener koos AWS SageMakeriga. Ta keskendub täieliku suuremahulise MLOps-süsteemi ehitamisele alates koolitusest kuni mudeli jälgimiseni tootmises. Samuti on ta kirglik MLOpside kontseptsiooni laiemale publikule toomise vastu.

Uued funktsioonid Amazon SageMaker Pipelines ja Amazon SageMaker SDK PlatoBlockchain Data Intelligence jaoks. Vertikaalne otsing. Ai.Dewen Qi on AWS-i tarkvaraarenduse insener. Praegu keskendub ta SageMakeri torujuhtmete arendamisele ja täiustamisele. Väljaspool tööd naudib ta tšellot.

Uued funktsioonid Amazon SageMaker Pipelines ja Amazon SageMaker SDK PlatoBlockchain Data Intelligence jaoks. Vertikaalne otsing. Ai.Gayatri Ghanakota on AWS-i professionaalsete teenustega vanem masinõppeinsener. Ta on kirglik AI/ML-lahenduste arendamiseks, juurutamiseks ja selgitamiseks erinevates valdkondades. Enne seda rolli juhtis ta mitmeid algatusi andmeteadlase ja ML-insenerina finants- ja jaemüügivaldkonna tippettevõtetes. Tal on Colorado ülikoolis Boulderis andmeteadusele spetsialiseerunud arvutiteaduse magistrikraad.

Uued funktsioonid Amazon SageMaker Pipelines ja Amazon SageMaker SDK PlatoBlockchain Data Intelligence jaoks. Vertikaalne otsing. Ai.Rupinder Grewal on AWS-iga Sr Ai/ML spetsialistide lahenduste arhitekt. Praegu keskendub ta mudelite ja MLO-de teenindamisele SageMakeris. Enne seda rolli on ta töötanud masinõppeinsenerina mudelite ehitamise ja hostimise kohta. Töövälisel ajal meeldib talle tennist mängida ja mägiradadel jalgrattaga sõita.

Uued funktsioonid Amazon SageMaker Pipelines ja Amazon SageMaker SDK PlatoBlockchain Data Intelligence jaoks. Vertikaalne otsing. Ai.Ray Li on AWS-i professionaalsete teenustega vanem andmeteadlane. Tema eriala keskendub AI/ML-lahenduste loomisele ja kasutuselevõtule erineva suurusega klientidele alates idufirmadest kuni ettevõtete organisatsioonideni. Väljaspool tööd naudib Ray treenimist ja reisimist.

Ajatempel:

Veel alates AWS-i masinõpe