Amazon SageMaker Pipelines ve Amazon SageMaker SDK PlatoBlockchain Veri Zekası için yeni özellikler. Dikey Arama. Ai.

Amazon SageMaker Pipelines ve Amazon SageMaker SDK için yeni özellikler

Amazon SageMaker Ardışık Düzenleri veri bilimcilerinin ve makine öğrenimi (ML) mühendislerinin eğitim iş akışlarını otomatikleştirmesine olanak tanır; bu, hızlı deneyler ve modelin yeniden eğitimi için model geliştirme adımlarını düzenlemek üzere tekrarlanabilir bir süreç oluşturmanıza yardımcı olur. Veri hazırlama, özellik mühendisliği, model eğitimi, model ayarlama ve model doğrulama da dahil olmak üzere tüm model oluşturma iş akışını otomatikleştirebilir ve bunu model kaydında kataloglayabilirsiniz. İşlem hatlarını düzenli aralıklarla veya belirli olaylar tetiklendiğinde otomatik olarak çalışacak şekilde yapılandırabilir veya bunları gerektiğinde manuel olarak çalıştırabilirsiniz.

Bu yazıda, geliştirmelerden bazılarını vurgulayacağız. Amazon Adaçayı Yapıcı SDK'yı kullanın ve Amazon SageMaker Pipelines'ın makine öğrenimi uygulayıcılarının makine öğrenimi modelleri oluşturmasını ve eğitmesini kolaylaştıran yeni özelliklerini tanıtın.

Pipelines, geliştirici deneyimini geliştirmeye devam ediyor ve bu son sürümlerle artık hizmeti daha özelleştirilmiş bir şekilde kullanabilirsiniz:

  • 2.99.0, 2.101.1, 2.102.0, 2.104.0 – Güncellenen belgeler PipelineVariable Tahminci, işlemci, tuner, transformatör ve model temel sınıfları, Amazon modelleri ve çerçeve modelleri için kullanım. Tahmincilerin ve işlemcilerin tüm alt sınıflarını desteklemek için SDK'nın daha yeni sürümleriyle birlikte ek değişiklikler gelecektir.
  • 2.90.0 – Kullanılabilirliği ModelAdım entegre model kaynağı oluşturma ve kayıt görevleri için.
  • 2.88.2 – Kullanılabilirliği Boru HattıOturumu SageMaker varlıkları ve kaynaklarıyla yönetilen etkileşim için.
  • 2.88.2 – Alt sınıf uyumluluğu iş akışı ardışık düzeni iş adımları böylece iş soyutlamaları oluşturabilir ve işleri bir işlem hattı olmadan yaptığınız gibi yapılandırıp çalıştırabilir, eğitebilir, dönüştürebilir ve ayarlayabilirsiniz.
  • 2.76.0 – Kullanılabilirliği Başarısız Adım Arıza durumundaki bir işlem hattını koşullu olarak durdurmak için.

Bu yazıda, Pipelines'ın yeni özelliklerinin nasıl uygulanacağını göstermek için model oluşturmaya ve devreye almaya odaklanan örnek bir veri kümesi kullanarak bir iş akışında size yol göstereceğiz. Sonunda, bu yeni özellikleri başarıyla kullanmak ve makine öğrenimi iş yüklerinizi basitleştirmek için yeterli bilgiye sahip olmalısınız.

Özelliklere genel bakış

Pipelines aşağıdaki yeni özellikleri sunar:

  • İşlem hattı değişkeni açıklaması – Bazı yöntem parametreleri birden fazla giriş türünü kabul eder; PipelineVariablesve nerede olduğunu açıklığa kavuşturmak için ek belgeler eklendi PipelineVariables hem SageMaker SDK belgelerinin en son kararlı sürümünde hem de işlevlerin başlangıç ​​imzasında desteklenir. Örneğin, aşağıdaki TensorFlow tahmincisinde başlangıç ​​imzası artık şunu gösteriyor: model_dir ve image_uri destek PipelineVariablesdiğer parametrelerde ise durum böyle değildir. Daha fazla bilgi için bkz. TensorFlow Tahmincisi.
    • Önce:
      TensorFlow(
          py_version=None,
          framework_version=None,
          model_dir=None,
          image_uri=None,
          distribution=None,
          **kwargs,
      )

    • Sonra:
      TensorFlow(
          py_version: Union[str, NoneType] = None,
          framework_version: Union[str, NoneType] = None,
          model_dir: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          image_uri: Union[str, sagemaker.workflow.entities.PipelineVariable, NoneType] = None,
          distribution: Union[Dict[str, str], NoneType] = None,
          compiler_config: Union[sagemaker.tensorflow.training_compiler.config.TrainingCompilerConfig, NoneType] = None,
          **kwargs,
      )

  • Ardışık düzen oturumu - Boru HattıOturumu SageMaker SDK genelinde birlik sağlamak için tanıtılan yeni bir kavramdır ve işlem hattı kaynaklarının yavaş başlatılmasını sağlar (çalıştırma çağrıları yakalanır ancak işlem hattı oluşturulup çalıştırılıncaya kadar çalıştırılmaz). PipelineSession bağlam devralır SageMakerSession ve diğer SageMaker varlıkları ve kaynaklarıyla (eğitim işleri, uç noktalar ve depolanan giriş veri kümeleri gibi) etkileşimde bulunmanız için uygun yöntemler uygular. Amazon Basit Depolama Hizmeti (Amazon S3).
  • İş akışı hattı iş adımlarıyla alt sınıf uyumluluğu – Artık iş soyutlamaları oluşturabilir ve işleri işlem hattı olmadan yaptığınız gibi yapılandırıp çalıştırabilir, eğitebilir, dönüştürebilir ve ayarlayabilirsiniz.
    • Örneğin, bir işleme adımı oluşturmak SKLearnProcessor önceden aşağıdakileri gerektiriyordu:
          sklearn_processor = SKLearnProcessor(
              framework_version=framework_version,
              instance_type=processing_instance_type,
              instance_count=processing_instance_count,
              sagemaker_session=sagemaker_session, #sagemaker_session would be passed as an argument
              role=role,
          )
          step_process = ProcessingStep(
              name="{pipeline-name}-process",
              processor=sklearn_processor,
              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
              ],
              outputs=[
                  ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                  ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
                  ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
              ],
              code=f"code/preprocess.py",
          )

    • Önceki kodda gördüğümüz gibi, ProcessingStep temelde aynı ön işleme mantığını yapması gerekir .run, işi başlatmak için API çağrısını başlatmadan. Ancak artık iş akışı ardışık düzeni iş adımlarıyla alt sınıf uyumluluğu etkinleştirildiğinden, step_args .run ile ön işleme mantığını alan bağımsız değişken, böylece bir iş soyutlaması oluşturabilir ve bunu Pipelines olmadan kullanacağınız şekilde yapılandırabilirsiniz. Biz de geçiyoruz pipeline_session, Hangi bir olduğunu PipelineSession bunun yerine nesne sagemaker_session çalıştırma çağrılarının yakalandığından ancak ardışık düzen oluşturulup çalıştırılana kadar çağrılmadığından emin olmak için. Aşağıdaki koda bakın:
      sklearn_processor = SKLearnProcessor(
          framework_version=framework_version,
          instance_type=processing_instance_type,
          instance_count=processing_instance_count,
          sagemaker_session=pipeline_session,#pipeline_session would be passed in as argument
          role=role,
      )
      
      processor_args = sklearn_processor.run(
          inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
          ],
          outputs=[
              ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
              ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
              ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
          ],
          code=f"code/preprocess.py",
      )
      step_process = ProcessingStep(name="{pipeline-name}-process", step_args=processor_args)

  • Model adımı (model oluşturma ve kayıt adımlarını içeren kolaylaştırılmış bir yaklaşım) –Pipelines, SageMaker modelleriyle entegrasyon için iki adım türü sunar: CreateModelStep ve RegisterModel. Artık her ikisini de yalnızca kullanarak elde edebilirsiniz. ModelStep tip. şunu unutmayın: PipelineSession bunu başarmak için gereklidir. Bu, işlem hattı adımları ile SDK arasında benzerlik sağlar.
    • Önce:
      step_register = RegisterModel(
              name="ChurnRegisterModel",
              estimator=xgb_custom_estimator,
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
              content_types=["text/csv"],
              response_types=["text/csv"],
              inference_instances=["ml.t2.medium", "ml.m5.large"],
              transform_instances=["ml.m5.large"],
              model_package_group_name=model_package_group_name,
              approval_status=model_approval_status,
              model_metrics=model_metrics,
      )

    • Sonra:
      register_args = model.register(
          content_types=["text/csv"],
          response_types=["text/csv"],
          inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
          transform_instances=["ml.m5.xlarge"],
          model_package_group_name=model_package_group_name,
          approval_status=model_approval_status,
          model_metrics=model_metrics,
      )
      step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

  • Başarısız adım (boru hattı çalışmasının koşullu olarak durdurulması) - FailStep model puanının belirli bir eşiğin altında olması gibi bir koşulun karşılanması durumunda bir işlem hattının arıza durumuyla durdurulmasına olanak tanır.

Çözüme genel bakış

Bu çözümde giriş noktanız Amazon SageMaker Stüdyosu Hızlı deneyler için entegre geliştirme ortamı (IDE). Studio, uçtan uca Pipelines deneyimini yönetmek için bir ortam sunar. Studio ile şunları atlayabilirsiniz: AWS Yönetim Konsolu tüm iş akışı yönetiminiz için. Ardışık Düzenleri Studio'dan yönetme hakkında daha fazla bilgi için bkz. SageMaker Studio'da SageMaker İşlem Hatlarını Görüntüleme, İzleme ve Yürütme.

Aşağıdaki şema, yeni özellikleri kullanarak çıkarımları eğitmek ve oluşturmak için farklı adımlarla birlikte makine öğrenimi iş akışının üst düzey mimarisini göstermektedir.

Boru hattı aşağıdaki adımları içerir:

  1. Gerekli özellikleri oluşturmak ve verileri eğitim, doğrulama ve test veri kümelerine bölmek için verileri önceden işleyin.
  2. SageMaker XGBoost çerçevesiyle bir eğitim işi oluşturun.
  3. Test veri kümesini kullanarak eğitilen modeli değerlendirin.
  4. AUC puanının önceden tanımlanmış bir eşiğin üzerinde olup olmadığını kontrol edin.
    • AUC puanı eşikten düşükse ardışık düzen çalışmasını durdurun ve başarısız olarak işaretleyin.
    • AUC puanı eşikten büyükse bir SageMaker modeli oluşturun ve bunu SageMaker model kaydına kaydedin.
  5. Önceki adımda oluşturulan modeli kullanarak verilen veri kümesine toplu dönüşüm uygulayın.

Önkoşullar

Bu gönderiyi takip etmek için bir AWS hesabına ihtiyacınız var. Stüdyo alanı.

İşlem hatları doğrudan SageMaker varlıkları ve kaynaklarıyla entegre olduğundan diğer AWS hizmetleriyle etkileşim kurmanıza gerek kalmaz. Ayrıca, tam olarak yönetilen bir hizmet olduğundan herhangi bir kaynağı yönetmenize de gerek yoktur; bu, kaynakları sizin için oluşturup yönettiği anlamına gelir. Hem bağımsız Python API'leri hem de Studio'nun entegre bileşenleri olan çeşitli SageMaker bileşenleri hakkında daha fazla bilgi için bkz. SageMaker ürün sayfası.

Başlamadan önce aşağıdaki kod parçacığını kullanarak SageMaker SDK sürümünü >= 2.104.0 ve xlrd >=1.0.0'ı Studio not defterine yükleyin:

print(sagemaker.__version__)
import sys
!{sys.executable} -m pip install "sagemaker>=2.104.0"
!{sys.executable} -m pip install "xlrd >=1.0.0"
 
import sagemaker

Makine öğrenimi iş akışı

Bu yazı için aşağıdaki bileşenleri kullanacaksınız:

  • Veri Hazırlama
    • SageMaker İşleme – SageMaker Processing, makine öğrenimi iş yükleri için özel veri dönüşümleri ve özellik mühendisliği çalıştırmanıza olanak tanıyan, tam olarak yönetilen bir hizmettir.
  • Model oluşturma
  • Model eğitimi ve değerlendirmesi
    • Tek tıkla eğitim – SageMaker dağıtılmış eğitim özelliği. SageMaker, veri paralelliği ve model paralelliği için dağıtılmış eğitim kütüphaneleri sağlar. Kitaplıklar SageMaker eğitim ortamı için optimize edilmiştir, dağıtılmış eğitim işlerinizi SageMaker'a uyarlamanıza yardımcı olur ve eğitim hızını ve verimi artırır.
    • SageMaker Deneyleri – Deneyler, SageMaker'ın makine öğrenimi yinelemelerinizi düzenlemenize, izlemenize, karşılaştırmanıza ve değerlendirmenize olanak tanıyan bir yeteneğidir.
    • SageMaker toplu dönüşümü – Toplu dönüştürme veya çevrimdışı puanlama, SageMaker'da makine öğrenimi modellerinizi kullanarak daha büyük bir veri kümesi üzerinde tahmin yapmanıza olanak tanıyan yönetilen bir hizmettir.
  • İş akışı düzenlemesi

SageMaker işlem hattı, JSON işlem hattı tanımıyla tanımlanan bir dizi birbirine bağlı adımdan oluşur. Yönlendirilmiş bir döngüsel olmayan grafik (DAG) kullanarak bir boru hattını kodlar. DAG, işlem hattının her adımına ilişkin gereksinimler ve bunlar arasındaki ilişkiler hakkında bilgi verir ve yapısı, adımlar arasındaki veri bağımlılıklarına göre belirlenir. Bu bağımlılıklar, bir adımın çıktısının özellikleri başka bir adıma girdi olarak aktarıldığında oluşturulur.

Aşağıdaki diyagram, adımlar arasındaki bağlantıların, adım tanımlarıyla tanımlanan giriş ve çıkışlara dayalı olarak SageMaker tarafından çıkarıldığı SageMaker hattındaki farklı adımları (bir kayıp tahmini kullanım durumu için) göstermektedir.

Sonraki bölümlerde işlem hattının her adımının oluşturulması ve oluşturulduktan sonra işlem hattının tamamının çalıştırılması açıklanmaktadır.

Amazon SageMaker Pipelines ve Amazon SageMaker SDK PlatoBlockchain Veri Zekası için yeni özellikler. Dikey Arama. Ai.

Proje yapısı

Proje yapısıyla başlayalım:

  • /sm-boru hatları-uçtan uca-örnek – Proje adı
    • / veri – Veri kümeleri
    • /boru hatları – Boru hattı bileşenlerinin kod dosyaları
      • /müşteri kaybı
        • önişlem.py
        • değerlendirmek.py
    • sagemaker-pipelines-project.ipynb – Pipelines'ın yeni özelliklerini kullanarak modelleme iş akışını gösteren bir not defteri

Veri kümesini indirin

Bu gönderiyi takip etmek için, indirmeniz ve kaydetmeniz gerekir. örnek veri kümesi dosyayı kaydeden proje ana dizini içindeki veri klasörü altında Amazon Elastik Dosya Sistemi (Amazon EFS) Studio ortamında.

Boru hattı bileşenlerini oluşturun

Artık boru hattı bileşenlerini oluşturmaya hazırsınız.

İfadeleri içe aktarın ve parametreleri ve sabitleri bildirin

Adlı bir Studio not defteri oluşturun sagemaker-pipelines-project.ipynb proje ana dizini içinde. Aşağıdaki kod bloğunu bir hücreye girin ve SageMaker ve S3 istemci nesnelerini ayarlamak için hücreyi çalıştırın, PipelineSessionve SageMaker oturumuyla birlikte gelen varsayılan klasörü kullanarak S3 klasör konumunu ayarlayın:

import boto3
import pandas as pd
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
 
s3_client = boto3.resource('s3')
pipeline_name = f"ChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"

İşlem hatları, işlem hattı kodunuzu değiştirmeden çalışma zamanında giriş parametrelerini belirtmenize olanak tanıyan parametreleştirmeyi destekler. altında bulunan modülleri kullanabilirsiniz. sagemaker.workflow.parameters modül, örneğin ParameterInteger, ParameterFloat, ve ParameterString, çeşitli veri türlerinin ardışık düzen parametrelerini belirtmek için. Birden çok giriş parametresini ayarlamak için aşağıdaki kodu çalıştırın:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
auc_score_threshold = 0.75
base_job_prefix = "churn-example"
model_package_group_name = "churn-job-model-packages"
batch_data = "s3://{}/data/batch/batch.csv".format(default_bucket)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
input_data = ParameterString(
    name="InputData",
    default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

Toplu veri kümesi oluşturma

Daha sonra toplu dönüştürme adımında kullanacağınız toplu veri kümesini oluşturun:

def preprocess_batch_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df
    
# convert the store_data file into csv format
store_data = pd.read_excel("data/storedata_total.xlsx")
store_data.to_csv("data/storedata_total.csv")
 
# preprocess batch data and save into the data folder
batch_data = preprocess_batch_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Verileri bir S3 paketine yükleyin

Veri kümelerini Amazon S3'e yükleyin:

s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")

Bir işleme komut dosyasını ve işleme adımını tanımlayın

Bu adımda, özellik mühendisliği, bir sıcak kodlama yapmak ve model oluşturma için kullanılacak eğitim, doğrulama ve test bölümlerini düzenlemek için bir Python betiği hazırlarsınız. İşleme komut dosyanızı oluşturmak için aşağıdaki kodu çalıştırın:

%%writefile pipelines/customerchurn/preprocess.py

import os
import tempfile
import numpy as np
import pandas as pd
import datetime as dt
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"
    #Read Data
    df = pd.read_csv(
        f"{base_dir}/input/storedata_total.csv"
    )
    # convert created column to datetime
    df["created"] = pd.to_datetime(df["created"])
    #Convert firstorder and lastorder to datetime datatype
    df["firstorder"] = pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    #Drop Rows with Null Values
    df = df.dropna()
    #Create column which gives the days between the last order and the first order
    df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days
    #Create column which gives the days between the customer record was created and the first order
    df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days
    #Drop columns
    df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)
    #Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])
    # Split into train, validation and test datasets
    y = df.pop("retained")
    X_pre = df
    y_pre = y.to_numpy().reshape(len(y), 1)
    X = np.concatenate((y_pre, X_pre), axis=1)
    np.random.shuffle(X)
    # Split in Train, Test and Validation Datasets
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
    train_rows = np.shape(train)[0]
    validation_rows = np.shape(validation)[0]
    test_rows = np.shape(test)[0]
    train = pd.DataFrame(train)
    test = pd.DataFrame(test)
    validation = pd.DataFrame(validation)
    # Convert the label column to integer
    train[0] = train[0].astype(int)
    test[0] = test[0].astype(int)
    validation[0] = validation[0].astype(int)
    # Save the Dataframes as csv files
    train.to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    validation.to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    test.to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Daha sonra, işlemciyi başlatmak için aşağıdaki kod bloğunu ve işleme betiğini çalıştırmak için Pipelines adımını çalıştırın. İşleme komut dosyası Pandas'ta yazıldığından, bir SKLearnİşlemci. Boru Hatları ProcessingStep işlevi şu bağımsız değişkenleri alır: işlemci, ham veri kümeleri için giriş S3 konumları ve işlenmiş veri kümelerini kaydetmek için çıkış S3 konumları.

# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")

# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"s3://{default_bucket}/input/code/preprocess.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

Bir eğitim adımı tanımlayın

SageMaker XGBoost tahmin aracını ve Pipelines'ı kullanarak model eğitimi ayarlayın TrainingStep işlevi:

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
)
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="ChurnModelTrain",
    step_args=train_args,
    )

Değerlendirme komut dosyasını ve model değerlendirme adımını tanımlayın

Modeli eğittikten sonra değerlendirmek için aşağıdaki kod bloğunu çalıştırın. Bu komut dosyası, AUC puanının belirtilen eşiği karşılayıp karşılamadığını kontrol etme mantığını kapsar.

%%writefile pipelines/customerchurn/evaluate.py

import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost
import datetime as dt
from sklearn.metrics import roc_curve,auc
if __name__ == "__main__":   
    #Read Model Tar File
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    model = pickle.load(open("xgboost-model", "rb"))
    #Read Test Data using which we evaluate the model
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)
    #Run Predictions
    predictions = model.predict(X_test)
    #Evaluate Predictions
    fpr, tpr, thresholds = roc_curve(y_test, predictions)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }
    #Save Evaluation Report
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Ardından, işlemciyi başlatmak için aşağıdaki kod bloğunu ve değerlendirme betiğini çalıştırmak için İşlem Hatları adımını çalıştırın. Değerlendirme komut dosyası XGBoost paketini kullandığından, ScriptProcessor XGBoost görüntüsüyle birlikte. Boru Hatları ProcessingStep işlevi şu bağımsız değişkenleri alır: işlemci, ham veri kümeleri için giriş S3 konumları ve işlenmiş veri kümelerini kaydetmek için çıkış S3 konumları.

#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
from sagemaker.processing import ScriptProcessor
# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"s3://{default_bucket}/input/code/evaluate.py",
)
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

Model oluşturma adımını tanımlama

İşlem Hatları modeli adımını kullanarak bir SageMaker modeli oluşturmak için aşağıdaki kod bloğunu çalıştırın. Bu adım, modeli dağıtım için paketlemek üzere eğitim adımının çıktısını kullanır. Örnek türü bağımsız değişkeninin değerinin, gönderide daha önce tanımladığınız Pipelines parametresi kullanılarak aktarıldığını unutmayın.

from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
# step to create model 
model = Model(
    image_uri=image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

Toplu dönüştürme adımı tanımlama

İlk adımda oluşturulan toplu girdiyle eğitilmiş modeli kullanarak toplu dönüştürmeyi çalıştırmak için aşağıdaki kod bloğunu çalıştırın:

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
    sagemaker_session=pipeline_session
)
                                 
step_transform = TransformStep(
    name="ChurnTransform", 
    step_args=transformer.transform(
                    data=batch_data,
                    content_type="text/csv"
                 )
)

Kayıt modeli adımını tanımlayın

Aşağıdaki kod, Pipelines model adımını kullanarak modeli SageMaker model kaydına kaydeder:

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

İşlem hattını durdurmak için başarısız bir adım tanımlayın

Aşağıdaki kod, AUC puanı tanımlanan eşiği karşılamıyorsa işlem hattı çalışmasını bir hata mesajıyla durdurmak için İşlem Hatları başarısız adımını tanımlar:

from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
step_fail = FailStep(
    name="ChurnAUCScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

AUC puanını kontrol etmek için bir koşul adımı tanımlayın

Aşağıdaki kod, AUC puanını kontrol etmek ve koşullu olarak bir model oluşturmak ve toplu dönüşüm çalıştırmak ve model kayıt defterine bir model kaydetmek veya başarısız bir durumda işlem hattı çalıştırmasını durdurmak için bir koşul adımını tanımlar:

from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

İşlem hattını oluşturun ve çalıştırın

Tüm bileşen adımlarını tanımladıktan sonra bunları bir Pipelines nesnesinde birleştirebilirsiniz. İşlem hattının sırasını belirtmeniz gerekmez çünkü İşlem Hatları, adımlar arasındaki bağımlılıklara göre sipariş sırasını otomatik olarak çıkarır.

import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)

Aşağıdaki kodu not defterinizdeki bir hücrede çalıştırın. İşlem hattı zaten mevcutsa kod, işlem hattını güncelleştirir. Boru hattı mevcut değilse yenisini oluşturur.

pipeline.start()
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sagemaker_role)
# start Pipeline execution

Sonuç

Bu yazıda, kayıp tahmini için bir model geliştirmek, yinelemek ve dağıtmak amacıyla diğer yerleşik SageMaker özellikleri ve XGBoost algoritmasının yanı sıra Pipelines'ta şu anda mevcut olan bazı yeni özellikleri tanıttık. Çözüm ek veri kaynaklarıyla genişletilebilir

kendi ML iş akışınızı uygulamak için. İşlem Hatları iş akışındaki mevcut adımlar hakkında daha fazla ayrıntı için bkz. Amazon SageMaker Model Oluşturma İşlem Hattı ve SageMaker İş Akışları. AWS SageMaker Örnekleri GitHub deposunda Pipelines'ın kullanıldığı çeşitli kullanım durumları hakkında daha fazla örnek var.


Yazarlar Hakkında

Amazon SageMaker Pipelines ve Amazon SageMaker SDK PlatoBlockchain Veri Zekası için yeni özellikler. Dikey Arama. Ai.Jerry Peng AWS SageMaker'da yazılım geliştirme mühendisidir. Üretimde eğitimden model izlemeye kadar uçtan uca büyük ölçekli MLOps sistemi oluşturmaya odaklanıyor. Aynı zamanda MLOps kavramını daha geniş bir kitleye ulaştırma konusunda da tutkulu.

Amazon SageMaker Pipelines ve Amazon SageMaker SDK PlatoBlockchain Veri Zekası için yeni özellikler. Dikey Arama. Ai.Dewen Qi'si AWS'de Yazılım Geliştirme Mühendisidir. Şu anda SageMaker Pipelines'ı geliştirmeye ve iyileştirmeye odaklanıyor. İş dışında Çello çalışmaktan hoşlanıyor.

Amazon SageMaker Pipelines ve Amazon SageMaker SDK PlatoBlockchain Veri Zekası için yeni özellikler. Dikey Arama. Ai.Gayatri Ganakota AWS Professional Services ile Kıdemli Makine Öğrenimi Mühendisidir. Çeşitli alanlarda AI/ML çözümlerini geliştirme, dağıtma ve açıklama konusunda tutkulu. Bu görevden önce, finans ve perakende alanındaki en iyi küresel firmalarda veri bilimcisi ve makine öğrenimi mühendisi olarak birden fazla girişimi yönetti. Colorado Üniversitesi, Boulder'dan Veri Bilimi alanında Bilgisayar Bilimi alanında yüksek lisans derecesine sahiptir.

Amazon SageMaker Pipelines ve Amazon SageMaker SDK PlatoBlockchain Veri Zekası için yeni özellikler. Dikey Arama. Ai.Ruinder Grewal AWS'li bir Kıdemli Ai/ML Uzman Çözüm Mimarıdır. Şu anda SageMaker'da modellerin ve MLO'ların sunumuna odaklanıyor. Bu görevden önce Makine Öğrenimi Mühendisi olarak model oluşturma ve barındırma konusunda çalıştı. İş dışında tenis oynamayı ve dağ yollarında bisiklet sürmeyi sever.

Amazon SageMaker Pipelines ve Amazon SageMaker SDK PlatoBlockchain Veri Zekası için yeni özellikler. Dikey Arama. Ai.ray li AWS Profesyonel Hizmetlerinde Kıdemli Veri Bilimcisidir. Uzmanlık alanı, yeni kurulan şirketlerden kurumsal organizasyonlara kadar farklı boyutlardaki müşteriler için AI/ML çözümlerini oluşturmaya ve operasyonel hale getirmeye odaklanıyor. Ray, iş dışında spor yapmaktan ve seyahat etmekten hoşlanıyor.

Zaman Damgası:

Den fazla AWS Makine Öğrenimi