Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою автопілота Amazon SageMaker

Автопілот Amazon SageMaker дає змогу організаціям швидко створювати та розгортати модель наскрізного машинного навчання (ML) і конвеєр висновків за допомогою лише кількох рядків коду або навіть без коду взагалі с Студія Amazon SageMaker. Автопілот знімає важку роботу з налаштування інфраструктури та часу, необхідного для створення всього конвеєра, включаючи розробку функцій, вибір моделі та налаштування гіперпараметрів.

У цій публікації ми покажемо, як перейти від необроблених даних до надійного та повністю розгорнутого конвеєра висновків за допомогою Autopilot.

Огляд рішення

Ми використовуємо Загальнодоступний набір даних Lyft про велопробіг для цієї симуляції, щоб передбачити, чи бере користувач участь у Програма Bike Share for All. Це проста задача двійкової класифікації.

Ми хочемо продемонструвати, як легко побудувати автоматизований конвеєр у режимі реального часу для класифікації користувачів на основі їх участі в програмі Bike Share for All. З цією метою ми моделюємо наскрізний конвеєр прийому даних і висновків для уявної компанії, яка працює в районі затоки Сан-Франциско.

Архітектура розбита на дві частини: конвеєр прийому та конвеєр виведення.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

У першому розділі цієї публікації ми зосереджуємося насамперед на конвеєрі машинного навчання та розглядаємо конвеєр прийому даних у другій частині.

Передумови

Щоб слідувати цьому прикладу, виконайте такі попередні умови:

  1. Створіть новий екземпляр блокнота SageMaker.
  2. створити Amazon Kinesis Data Firehose потік доставки з ан AWS Lambda функція перетворення. Інструкції див Перетворення даних Amazon Kinesis Firehose за допомогою AWS Lambda. Цей крок необов’язковий і потрібен лише для симуляції потоку даних.

Дослідження даних

Давайте завантажимо та візуалізуємо набір даних, який знаходиться в пабліку Служба простого зберігання Amazon (Amazon S3) сегмент і статичний веб-сайт:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

На наступному знімку екрана показано підмножину даних до перетворення.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Останній стовпець даних містить ціль, яку ми хочемо передбачити, яка є двійковою змінною, яка приймає значення «Так» або «Ні», що вказує, чи бере користувач участь у програмі Bike Share for All.

Давайте подивимося на розподіл нашої цільової змінної для будь-якого дисбалансу даних.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Як показано на графіку вище, дані незбалансовані, у програмі бере участь менше людей.

Нам потрібно збалансувати дані, щоб запобігти упередженню надмірного представлення. Цей крок є необов’язковим, оскільки Autopilot також пропонує внутрішній підхід для автоматичної обробки дисбалансу класів, який за замовчуванням використовує метрику перевірки результатів F1. Крім того, якщо ви вирішите збалансувати дані самостійно, ви можете використовувати більш просунуті методи обробки дисбалансу класів, наприклад ПРИГРИТИ or GAN.

Для цієї публікації ми зменшуємо вибірку більшості (Ні) як техніку балансування даних:

Наступний код збагачує дані та робить недостатню вибірку надмірно представленого класу:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Ми навмисно залишили незакодованими наші категоричні ознаки, включно з нашим двійковим цільовим значенням. Це пояснюється тим, що Autopilot піклується про кодування та декодування даних за нас у рамках автоматичного проектування функцій і розгортання конвеєра, як ми побачимо в наступному розділі.

На наступному знімку екрана показано зразок наших даних.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

В іншому випадку дані на наступних графіках виглядають нормально, з бімодальним розподілом, що представляє два піки для ранкових годин і годин пік після обіду, як і слід було очікувати. Також ми спостерігаємо низьку активність у вихідні та вночі.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

У наступному розділі ми передаємо дані Autopilot, щоб він міг запустити для нас експеримент.

Побудуйте модель бінарної класифікації

Автопілот вимагає, щоб ми вказали вхідні та вихідні сегменти призначення. Він використовує вхідне відро для завантаження даних і вихідне відро для збереження артефактів, таких як розробка функцій і згенеровані блокноти Jupyter. Ми зберігаємо 5% набору даних для оцінки та перевірки ефективності моделі після завершення навчання та завантажуємо 95% набору даних у вхідний сегмент S3. Перегляньте наступний код:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Після того, як ми завантажимо дані в ціль введення, настав час запустити автопілот:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Все, що нам потрібно для початку експерименту, це викликати метод fit(). Для автопілота потрібні вхідне та вихідне розташування S3 та цільовий стовпець атрибутів як обов’язкові параметри. Після обробки функції викликається автопілот Автоматичне налаштування моделі SageMaker щоб знайти найкращу версію моделі, виконавши багато навчальних завдань на вашому наборі даних. Ми додали необов’язковий параметр max_candidates, щоб обмежити кількість кандидатів до 30, тобто кількість навчальних завдань, які Autopilot запускає з різними комбінаціями алгоритмів і гіперпараметрів, щоб знайти найкращу модель. Якщо ви не вкажете цей параметр, за умовчанням він дорівнює 250.

Ми можемо спостерігати за прогресом автопілота за допомогою наступного коду:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

Навчання займає деякий час. Поки він працює, давайте подивимося на робочий процес автопілота.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Щоб знайти найкращого кандидата, використовуйте такий код:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

Наступний знімок екрана показує наш результат.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Наша модель досягла точності перевірки 96%, тому ми збираємося її розгорнути. Ми могли б додати умову, щоб ми використовували модель, лише якщо точність вище певного рівня.

Конвеєр для висновків

Перш ніж ми розгорнемо нашу модель, давайте розглянемо наш найкращий кандидат і те, що відбувається в нашому процесі. Перегляньте наступний код:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

Наступна діаграма показує наш результат.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Autopilot створив модель і розмістив її в три різні контейнери, кожен з яких послідовно виконує певне завдання: трансформація, прогнозування та зворотне трансформування. Цей багатоетапний висновок можливий за допомогою a Конвеєр висновку SageMaker.

Багатоетапний логічний висновок також може ланцюгувати кілька моделей висновків. Наприклад, може працювати один контейнер аналіз основних компонентів перед передачею даних у контейнер XGBoost.

Розгорніть конвеєр виведення до кінцевої точки

Процес розгортання включає лише кілька рядків коду:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Давайте налаштуємо нашу кінцеву точку для передбачення за допомогою предиктора:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Тепер, коли наша кінцева точка та предиктор готові, настав час використати дані тестування, які ми відклали, і перевірити точність нашої моделі. Ми починаємо з визначення службової функції, яка надсилає дані по одному рядку в нашу кінцеву точку висновку та отримує у відповідь передбачення. Тому що у нас є XGBoost моделі, ми видаляємо цільову змінну перед тим, як надсилати рядок CSV до кінцевої точки. Крім того, ми видалили заголовок із тестового CSV перед циклічним переглядом файлу, що також є ще однією вимогою для XGBoost на SageMaker. Перегляньте наступний код:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

Наступний знімок екрана показує наш результат.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Тепер обчислимо точність нашої моделі.
Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.

Дивіться наступний код:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Отримуємо точність 92%. Це трохи менше, ніж 96%, отримане під час етапу перевірки, але все одно достатньо високо. Ми не очікуємо, що точність буде такою ж, оскільки тест виконується з новим набором даних.

Приймання даних

Ми безпосередньо завантажили дані та налаштували їх для навчання. У реальному житті вам, можливо, доведеться надсилати дані безпосередньо з периферійного пристрою в озеро даних, а SageMaker завантажувати їх безпосередньо з озера даних у ноутбук.

Kinesis Data Firehose — це хороший варіант і найпростіший спосіб надійного завантаження потокових даних у озера даних, сховища даних та інструменти аналітики. Він може отримувати, перетворювати та завантажувати потокові дані в Amazon S3 та інші сховища даних AWS.

Для нашого випадку використання ми створюємо потік доставки Kinesis Data Firehose із функцією лямбда-перетворення, щоб виконувати легке очищення даних, коли вони проходять через потік. Перегляньте наступний код:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Ця лямбда-функція виконує легке перетворення даних, що надходять із пристроїв на озеро даних. Він очікує файл даних у форматі CSV.

На етапі прийому даних ми завантажуємо дані та моделюємо потік даних у Kinesis Data Firehose за допомогою функції лямбда-перетворення та в наше озеро даних S3.

Давайте змоделюємо потокове передавання кількох рядків:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Прибирати

Щоб мінімізувати витрати, важливо видалити всі ресурси, які використовуються в цій вправі. Наступний код видаляє кінцеву точку висновку SageMaker, яку ми створили, а також завантажені дані навчання та тестування:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Висновок

Інженери ML, спеціалісти з обробки даних і розробники програмного забезпечення можуть використовувати Autopilot для створення та розгортання конвеєра логічного висновку з невеликим або без досвіду програмування ML. Автопілот економить час і ресурси, використовуючи наукові дані та найкращі практики машинного навчання. Великі організації тепер можуть переключити інженерні ресурси з конфігурації інфраструктури на вдосконалення моделей і вирішення бізнес-випадків. Стартапи та невеликі організації можуть розпочати машинне навчання, маючи незначні знання або взагалі не маючи досвіду ML.

Щоб розпочати роботу з SageMaker Autopilot, див Сторінка продукту або отримати доступ до SageMaker Autopilot у SageMaker Studio.

Ми також рекомендуємо дізнатися більше про інші важливі функції, які може запропонувати SageMaker, наприклад Магазин функцій Amazon SageMaker, який інтегрується з Трубопроводи Amazon SageMaker створювати, додавати функції пошуку та виявлення, а також повторно використовувати автоматизовані робочі процеси ML. Ви можете запустити кілька симуляцій автопілота з різними функціями або цільовими варіантами у вашому наборі даних. Ви також можете підійти до цього як до проблеми динамічного розподілу транспортних засобів, у якій ваша модель намагається передбачити попит на транспортні засоби на основі часу (наприклад, часу доби чи дня тижня) або місця розташування, або їх комбінації.


Про авторів

Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Дуг Мбайя є старшим архітектором рішень, який спеціалізується на даних і аналітиці. Даг тісно співпрацює з партнерами AWS, допомагаючи їм інтегрувати рішення для обробки даних і аналітики в хмарі. Попередній досвід Дуга включає підтримку клієнтів AWS у сегменті спільного використання поїздок і доставки їжі.

Автоматизуйте спільну модель класифікації велосипедів і скутерів за допомогою Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Валеріо Перроне є менеджером прикладних наук, який працює над автоматичним налаштуванням моделі та автопілотом Amazon SageMaker.

Часова мітка:

Більше від AWS Машинне навчання