Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Автоматизируйте общую модель классификации велосипедов и самокатов с помощью Amazon SageMaker Autopilot

Amazon SageMaker Автопилот позволяет организациям быстро создавать и развертывать модель сквозного машинного обучения (ML) и конвейер вывода с помощью всего лишь нескольких строк кода или даже без какого-либо кода вообще с Студия Amazon SageMaker. Автопилот снимает с вас тяжелую работу по настройке инфраструктуры и время, необходимое для построения всего конвейера, включая разработку функций, выбор модели и настройку гиперпараметров.

В этом посте мы покажем, как перейти от необработанных данных к надежному и полностью развернутому конвейеру вывода с помощью автопилота.

Обзор решения

МЫ ИСПОЛЬЗУЕМ Публичный набор данных Lyft о совместном использовании велосипедов для этого моделирования, чтобы предсказать, будет ли пользователь участвовать в Программа «Велопрокат для всех». Это простая задача двоичной классификации.

Мы хотим продемонстрировать, насколько легко создать автоматизированный конвейер вывода в режиме реального времени для классификации пользователей на основе их участия в программе Bike Share for All. С этой целью мы моделируем сквозной процесс приема и вывода данных для воображаемой компании по прокату велосипедов, работающей в районе залива Сан-Франциско.

Архитектура разбита на две части: конвейер приема и конвейер вывода.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

В первом разделе этой статьи мы в первую очередь сосредоточимся на конвейере ML, а во второй части — на конвейере приема данных.

Предпосылки

Чтобы следовать этому примеру, выполните следующие предварительные условия:

  1. Создайте новый экземпляр блокнота SageMaker..
  2. Создать Пожарный шланг данных Amazon Kinesis поток доставки с AWS Lambda функция преобразования. Инструкции см. Преобразование данных Amazon Kinesis Firehose с помощью AWS Lambda. Этот шаг не является обязательным и необходим только для имитации потоковой передачи данных.

Исследование данных

Давайте загрузим и визуализируем набор данных, который находится в публичном доступе. Простой сервис хранения Amazon (Amazon S3) Корзина и статический веб-сайт:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

На следующем снимке экрана показано подмножество данных перед преобразованием.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Последний столбец данных содержит цель, которую мы хотим спрогнозировать, которая представляет собой двоичную переменную, принимающую значение «Да» или «Нет», указывающую, участвует ли пользователь в программе Bike Share for All.

Давайте посмотрим на распределение нашей целевой переменной при любом дисбалансе данных.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Как показано на графике выше, данные несбалансированы: в программе участвует меньшее количество людей.

Нам необходимо сбалансировать данные, чтобы предотвратить предвзятость в отношении чрезмерной представленности. Этот шаг является необязательным, поскольку автопилот также предлагает внутренний подход для автоматической обработки дисбаланса классов, который по умолчанию использует метрику проверки оценки F1. Кроме того, если вы решите сбалансировать данные самостоятельно, вы можете использовать более сложные методы обработки дисбаланса классов, такие как Пронзила or ГАН.

В этом посте мы уменьшаем выборку класса большинства (Нет) в качестве метода балансировки данных:

Следующий код обогащает данные и занижает выборку перепредставленного класса:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Мы намеренно оставили наши категориальные функции незакодированными, включая двоичное целевое значение. Это связано с тем, что автопилот заботится о кодировании и декодировании данных за нас в рамках автоматического проектирования функций и развертывания конвейера, как мы увидим в следующем разделе.

На следующем снимке экрана показан образец наших данных.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Данные на следующих графиках в остальном выглядят нормально: бимодальное распределение представляет собой два пика для утренних часов и дневных часов пик, как и следовало ожидать. Также мы наблюдаем низкую активность в выходные и ночное время.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

В следующем разделе мы передаем данные автопилоту, чтобы он мог провести за нас эксперимент.

Постройте модель двоичной классификации

Автопилот требует, чтобы мы указали сегменты назначения ввода и вывода. Он использует входную корзину для загрузки данных и выходную корзину для сохранения артефактов, таких как разработка функций и созданные блокноты Jupyter. Мы сохраняем 5 % набора данных для оценки и проверки эффективности модели после завершения обучения и загружаем 95 % набора данных во входную корзину S3. См. следующий код:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

После того, как мы загрузим данные в место назначения ввода, пришло время запустить автопилот:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Все, что нам нужно, чтобы начать экспериментировать, — это вызвать метод fit(). В качестве обязательных параметров автопилоту требуется расположение входного и выходного S3, а также столбец целевого атрибута. После обработки функции автопилот вызывает SageMaker автоматическая настройка модели найти лучшую версию модели, запустив множество обучающих заданий в вашем наборе данных. Мы добавили необязательный параметр max_candidates, чтобы ограничить количество кандидатов до 30, то есть количества заданий обучения, которые автопилот запускает с различными комбинациями алгоритмов и гиперпараметров, чтобы найти лучшую модель. Если вы не укажете этот параметр, по умолчанию он будет равен 250.

Мы можем наблюдать за ходом работы автопилота с помощью следующего кода:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

Обучение занимает некоторое время. Пока он работает, давайте посмотрим на рабочий процесс автопилота.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Чтобы найти лучшего кандидата, используйте следующий код:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

На следующем снимке экрана показан наш результат.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Наша модель достигла точности проверки 96 %, поэтому мы собираемся ее внедрить. Мы могли бы добавить условие, согласно которому мы будем использовать модель только в том случае, если точность превышает определенный уровень.

Конвейер логического вывода

Прежде чем развернуть нашу модель, давайте рассмотрим нашего лучшего кандидата и то, что происходит в нашем конвейере. См. следующий код:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

На следующей диаграмме показаны наши результаты.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Автопилот построил модель и упаковал ее в три разных контейнера, каждый из которых последовательно выполняет определенную задачу: преобразование, прогнозирование и обратное преобразование. Этот многоэтапный вывод возможен с помощью Конвейер вывода SageMaker.

Многоэтапный вывод также может объединять несколько моделей вывода. Например, один контейнер может выполнять анализ основных компонентов перед передачей данных в контейнер XGBoost.

Развертывание конвейера вывода в конечной точке

Процесс развертывания включает всего несколько строк кода:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Давайте настроим нашу конечную точку для прогнозирования с помощью предиктора:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Теперь, когда у нас есть готовая конечная точка и предиктор, пришло время использовать отложенные нами тестовые данные и проверить точность нашей модели. Мы начнем с определения вспомогательной функции, которая отправляет данные по одной строке за раз в нашу конечную точку вывода и получает взамен прогноз. Потому что у нас есть XGBoost модели мы удаляем целевую переменную перед отправкой строки CSV в конечную точку. Кроме того, мы удалили заголовок из тестового CSV перед циклическим просмотром файла, что также является еще одним требованием для XGBoost в SageMaker. См. следующий код:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

На следующем снимке экрана показан наш результат.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Теперь посчитаем точность нашей модели.
Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.

Смотрите следующий код:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Получаем точность 92%. Это немного ниже, чем 96%, полученные на этапе валидации, но все же достаточно высоко. Мы не ожидаем, что точность будет такой же, поскольку тест выполняется с новым набором данных.

Прием данных

Мы скачали данные напрямую и настроили их для обучения. В реальной жизни вам, возможно, придется отправить данные непосредственно с периферийного устройства в озеро данных, а SageMaker загрузит их непосредственно из озера данных в ноутбук.

Kinesis Data Firehose — это хороший вариант и самый простой способ надежной загрузки потоковых данных в озера данных, хранилища данных и инструменты аналитики. Он может собирать, преобразовывать и загружать потоковые данные в Amazon S3 и другие хранилища данных AWS.

В нашем случае мы создаем поток доставки Kinesis Data Firehose с функцией преобразования Lambda, чтобы выполнять некоторую упрощенную очистку данных по мере их прохождения через поток. См. следующий код:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Эта функция Lambda выполняет легкое преобразование данных, передаваемых с устройств в озеро данных. Он ожидает файл данных в формате CSV.

На этапе приема мы загружаем данные и моделируем поток данных в Kinesis Data Firehose с помощью функции лямбда-преобразования и в наше озеро данных S3.

Давайте смоделируем потоковую передачу нескольких строк:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Убирать

Чтобы минимизировать затраты, важно удалить все ресурсы, используемые в этом упражнении. Следующий код удаляет созданную нами конечную точку вывода SageMaker, а также загруженные нами данные обучения и тестирования:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Заключение

Инженеры машинного обучения, специалисты по данным и разработчики программного обеспечения могут использовать Autopilot для создания и развертывания конвейера вывода, практически не имея опыта программирования машинного обучения. Автопилот экономит время и ресурсы, используя науку о данных и лучшие практики машинного обучения. Крупные организации теперь могут переключить инженерные ресурсы с настройки инфраструктуры на улучшение моделей и решение бизнес-примеров использования. Стартапы и небольшие организации могут начать заниматься машинным обучением, практически не имея опыта ML.

Чтобы начать работу с автопилотом SageMaker, см. странице продукта или получите доступ к SageMaker Autopilot в SageMaker Studio.

Мы также рекомендуем узнать больше о других важных функциях, которые может предложить SageMaker, таких как Магазин функций Amazon SageMaker, который интегрируется с Конвейеры Amazon SageMaker создавать, добавлять функции поиска и обнаружения, а также повторно использовать автоматизированные рабочие процессы машинного обучения. Вы можете запустить несколько симуляций автопилота с разными вариантами объектов или целевых объектов в вашем наборе данных. Вы также можете подойти к этому как к проблеме динамического распределения транспортных средств, в которой ваша модель пытается предсказать спрос на транспортные средства на основе времени (например, времени суток или дня недели) или местоположения или их комбинации.


Об авторах

Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Дуг Мбая — старший архитектор решений, специализирующийся на данных и аналитике. Дуг тесно сотрудничает с партнерами AWS, помогая им интегрировать решения для обработки данных и аналитики в облаке. Предыдущий опыт Дуга включает поддержку клиентов AWS в сегменте совместного использования поездок и доставки еды.

Автоматизируйте модель классификации велосипедов и самокатов общего пользования с помощью Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Валерио Перроне — менеджер по прикладным наукам, работающий над автоматической настройкой моделей и автопилотом Amazon SageMaker.

Отметка времени:

Больше от Машинное обучение AWS