Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

Amazon SageMaker Autopilotを使用して、共有の自転車とスクーターの分類モデルを自動化します

Amazon SageMakerオートパイロット 組織は、わずか数行のコードだけで、エンドツーエンドの機械学習 (ML) モデルと推論パイプラインを迅速に構築してデプロイできるようになります。 コードなしで まったく一緒に Amazon SageMakerスタジオ。 Autopilot は、インフラストラクチャの構成にかかる重労働と、機能エンジニアリング、モデルの選択、ハイパーパラメーターの調整などのパイプライン全体の構築にかかる時間を軽減します。

この投稿では、Autopilot を使用して、生データから堅牢で完全にデプロイされた推論パイプラインに移行する方法を示します。

ソリューションの概要

を使用しております Lyft の自転車シェアリングに関する公開データセット このシミュレーションでは、ユーザーがイベントに参加するかどうかを予測します。 みんなのバイクシェアプログラム。 これは単純な二項分類問題です。

私たちは、Bike Share for All プログラムへの参加に基づいてユーザーを分類するための自動化されたリアルタイム推論パイプラインを構築することがいかに簡単であるかを紹介したいと考えています。 この目的を達成するために、サンフランシスコ ベイエリアで運営されている架空のバイクシェア会社のエンドツーエンドのデータ取り込みと推論パイプラインをシミュレートします。

アーキテクチャは、取り込みパイプラインと推論パイプラインの XNUMX つの部分に分かれています。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

この投稿の最初のセクションでは主に ML パイプラインに焦点を当て、XNUMX 番目のセクションではデータ取り込みパイプラインを確認します。

前提条件

この例に従うには、次の前提条件を満たしている必要があります。

  1. 新しい SageMaker ノートブック インスタンスを作成する.
  2. 作る Amazon Kinesis データ ファイアホース 配信ストリーム AWSラムダ 変換機能。 手順については、を参照してください。 AWS Lambda を使用した Amazon Kinesis Firehose データ変換。 このステップはオプションであり、データ ストリーミングをシミュレートする場合にのみ必要です。

データの探索

公開されているデータセットをダウンロードして視覚化しましょう。 Amazon シンプル ストレージ サービス (Amazon S3) バケットと静的 Web サイト:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

次のスクリーンショットは、変換前のデータのサブセットを示しています。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

データの最後の列には、予測するターゲットが含まれています。これは、Yes または No の値を取るバイナリ変数で、ユーザーが Bike Share for All プログラムに参加しているかどうかを示します。

データの不均衡に対するターゲット変数の分布を見てみましょう。

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

上のグラフに示されているように、プログラムに参加する人が少なく、データは不均衡です。

過剰表現の偏りを防ぐためにデータのバランスを取る必要があります。 Autopilot はクラスの不均衡を自動的に処理する内部アプローチも提供しており、デフォルトでは F1 スコア検証メトリックが使用されるため、このステップはオプションです。 さらに、データのバランスを自分で調整することを選択した場合は、クラスの不均衡を処理するためのより高度な手法を使用できます。 スモート or GAN.

この投稿では、データ バランシング手法として多数派クラス (いいえ) をダウンサンプリングします。

次のコードはデータを強化し、過剰表現されたクラスをアンダーサンプリングします。

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

バイナリのターゲット値を含め、カテゴリ特徴量を意図的にエンコードしないままにしました。 これは、次のセクションで説明するように、Autopilot が自動機能エンジニアリングとパイプライン展開の一環としてデータのエンコードとデコードを処理してくれるためです。

次のスクリーンショットは、データのサンプルを示しています。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

次のグラフのデータは、ご想像のとおり、朝の時間帯と午後のラッシュ時間帯の XNUMX つのピークを表す二峰性分布を持ち、それ以外は正常に見えます。 週末や夜間も活動が少ないのが観察されます。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

次のセクションでは、Autopilot にデータをフィードして、実験を実行できるようにします。

二項分類モデルを構築する

オートパイロットでは、入力および出力先のバケットを指定する必要があります。 入力バケットを使用してデータをロードし、出力バケットを使用して、特徴量エンジニアリングや生成された Jupyter ノートブックなどの成果物を保存します。 トレーニングが完了した後、モデルのパフォーマンスを評価および検証するためにデータセットの 5% を保持し、データセットの 95% を S3 入力バケットにアップロードします。 次のコードを参照してください。

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

データを入力先にアップロードしたら、Autopilot を開始します。

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

実験を開始するために必要なのは、fit() メソッドを呼び出すことだけです。 オートパイロットには、必須パラメーターとして入力および出力 S3 の場所とターゲット属性列が必要です。 機能の処理後、Autopilot が呼び出します SageMaker自動モデル調整 データセットに対して多くのトレーニング ジョブを実行して、モデルの最適なバージョンを見つけます。 オプションの max_candidates パラメーターを追加して、候補の数を 30 に制限しました。これは、最適なモデルを見つけるために、オートパイロットがアルゴリズムとハイパーパラメーターのさまざまな組み合わせで起動するトレーニング ジョブの数です。 このパラメーターを指定しない場合、デフォルトの 250 になります。

次のコードを使用して Autopilot の進行状況を観察できます。

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

トレーニングが完了するまでには時間がかかります。 実行中に、Autopilot ワークフローを見てみましょう。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

最適な候補を見つけるには、次のコードを使用します。

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

次のスクリーンショットは、出力を示しています。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

私たちのモデルは 96% の検証精度を達成したので、それをデプロイします。 精度が一定のレベルを超えた場合にのみモデルを使用するなどの条件を追加できます。

推論パイプライン

モデルをデプロイする前に、最適な候補とパイプラインで何が起こっているかを調べてみましょう。 次のコードを参照してください。

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

次の図は出力を示しています。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

Autopilot はモデルを構築し、それを XNUMX つの異なるコンテナーにパッケージ化し、それぞれが特定のタスク (変換、予測、逆変換) を順番に実行します。 この多段階の推論は、 SageMaker 推論パイプライン。

マルチステップ推論では、複数の推論モデルを連鎖させることもできます。 たとえば、XNUMX つのコンテナで実行できるのは、 主成分分析 データを XGBoost コンテナに渡す前に。

推論パイプラインをエンドポイントにデプロイする

導入プロセスには、わずか数行のコードが含まれます。

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

プレディクターを使用して予測用のエンドポイントを構成しましょう。

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

エンドポイントと予測子の準備ができたので、確保しておいたテスト データを使用してモデルの精度をテストします。 まず、データを一度に XNUMX 行ずつ推論エンドポイントに送信し、代わりに予測を取得するユーティリティ関数を定義します。 なぜなら、私たちは XGブースト モデルでは、CSV 行をエンドポイントに送信する前にターゲット変数を削除します。 さらに、ファイルをループする前にテスト CSV からヘッダーを削除しました。これは、SageMaker 上の XGBoost のもう XNUMX つの要件でもあります。 次のコードを参照してください。

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

次のスクリーンショットは、出力を示しています。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

次に、モデルの精度を計算してみましょう。
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。

次のコードを参照してください。

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

92% の精度が得られます。 これは検証ステップで得られた 96% よりわずかに低いですが、それでも十分に高い値です。 テストは新しいデータセットを使用して実行されるため、精度がまったく同じになるとは期待できません。

データの取り込み

データを直接ダウンロードし、トレーニング用に構成しました。 実際には、エッジ デバイスからデータ レイクにデータを直接送信し、SageMaker にデータ レイクからノートブックにデータを直接ロードさせる必要がある場合があります。

Kinesis Data Firehose は優れたオプションであり、ストリーミング データをデータレイク、データストア、分析ツールに確実にロードするための最も簡単な方法です。 ストリーミング データをキャプチャ、変換し、Amazon S3 やその他の AWS データ ストアにロードできます。

このユースケースでは、Lambda 変換関数を使用して Kinesis Data Firehose 配信ストリームを作成し、ストリームを通過するときに軽量のデータ クリーニングを実行します。 次のコードを参照してください。

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

この Lambda 関数は、デバイスからデータレイクにストリーミングされたデータの簡易変換を実行します。 CSV 形式のデータ ファイルが必要です。

取り込みステップでは、データをダウンロードし、Lambda 変換関数を使用して Kinesis Data Firehose と S3 データレイクへのデータ ストリームをシミュレートします。

数行のストリーミングをシミュレートしてみましょう。

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

クリーンアップ

コストを最小限に抑えるために、この演習で使用したすべてのリソースを削除することが重要です。 次のコードは、作成した SageMaker 推論エンドポイントと、アップロードしたトレーニング データとテスト データを削除します。

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

まとめ

ML エンジニア、データ サイエンティスト、ソフトウェア開発者は、ML プログラミングの経験がほとんどなくても、Autopilot を使用して推論パイプラインを構築およびデプロイできます。 Autopilot は、データ サイエンスと ML のベスト プラクティスを使用して、時間とリソースを節約します。 大規模組織は、エンジニアリング リソースをインフラストラクチャ構成からモデルの改善やビジネス ユースケースの解決に移すことができるようになりました。 新興企業や小規模組織は、ML の専門知識がほとんどまたはまったくなくても、機械学習を始めることができます。

SageMaker Autopilot の使用を開始するには、以下を参照してください。 製品ページ または、SageMaker Studio 内で SageMaker Autopilot にアクセスします。

また、SageMaker が提供する他の重要な機能について詳しく学ぶことをお勧めします。 Amazon SageMaker フィーチャーストア、と統合します AmazonSageMakerパイプライン 自動化された ML ワークフローを作成、追加し、機能の検索と検出を行い、再利用します。 データセット内のさまざまなフィーチャまたはターゲット バリアントを使用して、複数のオートパイロット シミュレーションを実行できます。 また、モデルが時間 (時刻や曜日など) または場所、またはその両方の組み合わせに基づいて車両需要を予測しようとする動的な車両割り当て問題としてこれにアプローチすることもできます。


著者について

Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。ダグ・ムバヤ データと分析を専門とするシニア ソリューション アーキテクトです。 Doug は AWS パートナーと緊密に連携し、データと分析ソリューションをクラウドに統合するのを支援しています。 Doug のこれまでの経験には、ライドシェアリングおよび食品配達セグメントにおける AWS 顧客のサポートが含まれます。

Amazon SageMaker Autopilot PlatoBlockchain Data Intelligenceを使用して、共有の自転車とスクーターの分類モデルを自動化します。 垂直検索。 愛。バレリオ・ペローネ は、AmazonSageMakerの自動モデルチューニングと自動操縦に取り組んでいる応用科学マネージャーです。

タイムスタンプ:

より多くの AWS機械学習