使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

使用 Amazon SageMaker Autopilot 自动化共享自行车和踏板车分类模型

亚马逊SageMaker自动驾驶仪 使组织只需几行代码甚至可以快速构建和部署端到端机器学习 (ML) 模型和推理管道 没有任何代码 完全有 亚马逊SageMaker Studio. Autopilot 减轻了配置基础设施的繁重工作和构建整个管道所需的时间,包括特征工程、模型选择和超参数调整。

在这篇文章中,我们展示了如何使用 Autopilot 从原始数据转变为强大且完全部署的推理管道。

解决方案概述

我们使用 Lyft 关于共享单车的公共数据集 用于此模拟来预测用户是否参与 共享单车计划. 这是一个简单的二元分类问题。

我们想展示构建自动化和实时推理管道以根据用户参与 Bike Share for All 计划对用户进行分类是多么容易。 为此,我们为在旧金山湾区运营的一家假想的共享单车公司模拟了端到端的数据摄取和推理管道。

该架构分为两部分:摄取管道和推理管道。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

我们在本文的第一部分主要关注 ML 管道,并在第二部分回顾数据摄取管道。

先决条件

要遵循此示例,请完成以下先决条件:

  1. 创建一个新的 SageMaker 笔记本实例.
  2. 创建 亚马逊 Kinesis 数据流水线 交付流与 AWS Lambda 变换函数。 有关说明,请参阅 使用 AWS Lambda 进行 Amazon Kinesis Firehose 数据转换. 此步骤是可选的,仅在模拟数据流时需要。

数据探索

让我们下载并可视化位于公共的数据集 亚马逊简单存储服务 (Amazon S3) 存储桶和静态网站:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

以下屏幕截图显示了转换前的数据子集。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

数据的最后一列包含我们要预测的目标,它是一个二进制变量,取 Yes 或 No 值,表示用户是否参与了 ​​Bike Share for All 计划。

让我们看一下任何数据不平衡的目标变量的分布。

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

如上图所示,数据不平衡,参与计划的人数较少。

我们需要平衡数据以防止过度表示偏差。 此步骤是可选的,因为 Autopilot 还提供了一种自动处理类不平衡的内部方法,默认为 F1 分数验证指标。 此外,如果您选择自己平衡数据,则可以使用更高级的技术来处理类不平衡,例如 冒烟 or .

在这篇文章中,我们将多数类(否)作为数据平衡技术进行下采样:

以下代码丰富了数据并对过度表示的类进行了欠采样:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

我们故意不编码我们的分类特征,包括我们的二进制目标值。 这是因为 Autopilot 负责为我们编码和解码数据,作为自动特征工程和管道部署的一部分,我们将在下一节中看到。

以下屏幕截图显示了我们的数据示例。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

下图中的数据看起来很正常,如您所料,双峰分布代表上午和下午高峰时间的两个峰值。 我们还观察到周末和晚上的低活动。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

在下一部分中,我们将数据提供给 Autopilot,以便它可以为我们运行实验。

构建二元分类模型

Autopilot 要求我们指定输入和输出目标存储桶。 它使用输入存储桶加载数据,使用输出存储桶保存工件,例如特征工程和生成的 Jupyter 笔记本。 我们保留 5% 的数据集用于在训练完成后评估和验证模型的性能,并将 95% 的数据集上传到 S3 输入存储桶。 请参阅以下代码:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

在我们将数据上传到输入目的地之后,就该启动 Autopilot:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

我们需要开始试验的只是调用 fit() 方法。 Autopilot 需要输入输出 S3 位置和目标属性列作为必填参数。 特征处理后,Autopilot 调用 SageMaker自动模型调整 通过在数据集上运行许多训练作业来找到模型的最佳版本。 我们添加了可选的 max_candidates 参数以将候选者的数量限制为 30,这是 Autopilot 使用不同的算法和超参数组合启动的训练作业的数量,以找到最佳模型。 如果不指定此参数,则默认为 250。

我们可以通过以下代码观察 Autopilot 的进度:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

培训需要一些时间才能完成。 在它运行时,让我们看看 Autopilot 工作流程。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

要找到最佳候选者,请使用以下代码:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

以下屏幕截图显示了我们的输出。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

我们的模型实现了 96% 的验证准确率,因此我们将对其进行部署。 我们可以添加一个条件,使我们仅在准确度高于某个水平时才使用该模型。

推理管道

在我们部署我们的模型之前,让我们检查一下我们的最佳候选者以及我们的管道中发生了什么。 请参阅以下代码:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

下图显示了我们的输出。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

Autopilot 已构建模型并将其打包在三个不同的容器中,每个容器依次运行特定任务:变换、预测和反向变换。 这种多步推理是可能的 SageMaker 推理管道。

多步推理也可以链接多个推理模型。 例如,一个容器可以执行 主成分分析 在将数据传递到 XGBoost 容器之前。

将推理管道部署到端点

部署过程只涉及几行代码:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

让我们使用预测器配置我们的端点进行预测:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

现在我们已经准备好端点和预测器,是时候使用我们预留的测试数据并测试模型的准确性了。 我们首先定义一个效用函数,将数据一次一行地发送到我们的推理端点,并得到一个预测作为回报。 因为我们有一个 XGBoost 模型,我们在将 CSV 行发送到端点之前删除目标变量。 此外,我们在循环文件之前从测试 CSV 中删除了标头,这也是 SageMaker 上 XGBoost 的另一个要求。 请参阅以下代码:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

以下屏幕截图显示了我们的输出。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

现在让我们计算模型的准确性。
使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。

请参见以下代码:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

我们得到 92% 的准确率。 这比验证步骤中获得的 96% 略低,但仍然足够高。 我们不希望准确度完全相同,因为测试是使用新数据集执行的。

数据提取

我们直接下载数据并对其进行配置以进行训练。 在现实生活中,您可能必须将数据直接从边缘设备发送到数据湖中,并让 SageMaker 将其直接从数据湖加载到笔记本中。

Kinesis Data Firehose 是一个不错的选择,也是将流数据可靠地加载到数据湖、数据存储和分析工具中的最直接方法。 它可以捕获、转换流数据并将其加载到 Amazon S3 和其他 AWS 数据存储中。

对于我们的用例,我们创建了一个带有 Lambda 转换函数的 Kinesis Data Firehose 传输流,以便在它遍历流时执行一些轻量级数据清理。 请参阅以下代码:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

此 Lambda 函数对从设备流式传输到数据湖的数据执行轻度转换。 它需要一个 CSV 格式的数据文件。

对于摄取步骤,我们下载数据并使用 Lambda 转换函数将数据流模拟到 Kinesis Data Firehose 和我们的 S3 数据湖中。

让我们模拟几行流式传输:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

清理

删除本练习中使用的所有资源以最大限度地降低成本非常重要。 以下代码删除了我们创建的 SageMaker 推理端点以及我们上传的训练和测试数据:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

结论

ML 工程师、数据科学家和软件开发人员可以使用 Autopilot 构建和部署推理管道,而几乎没有 ML 编程经验。 Autopilot 使用数据科学和 ML 最佳实践节省时间和资源。 大型组织现在可以将工程资源从基础设施配置转移到改进模型和解决业务用例。 初创公司和小型组织可以在几乎没有 ML 专业知识的情况下开始机器学习。

要开始使用 SageMaker Autopilot,请参阅 产品页面 或在 SageMaker Studio 中访问 SageMaker Autopilot。

我们还建议您详细了解 SageMaker 必须提供的其他重要功能,例如 Amazon SageMaker功能商店,它与 Amazon SageMaker管道 创建、添加特征搜索和发现,以及重用自动化 ML 工作流程。 您可以使用数据集中的不同特征或目标变体运行多个 Autopilot 模拟。 您也可以将此视为动态车辆分配问题,您的模型尝试根据时间(例如一天中的时间或一周中的一天)或位置或两者的组合来预测车辆需求。


作者简介

使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。道格·姆巴亚 是一名高级解决方案架构师,专注于数据和分析。 Doug 与 AWS 合作伙伴密切合作,帮助他们在云中集成数据和分析解决方案。 Doug 之前的经验包括在拼车和送餐领域为 AWS 客户提供支持。

使用 Amazon SageMaker Autopilot PlatoBlockchain 数据智能自动化共享自行车和踏板车分类模型。 垂直搜索。 哎。瓦莱里奥·佩罗内 是一名应用科学经理,负责 Amazon SageMaker 自动模型调整和自动驾驶仪。

时间戳记:

更多来自 AWS机器学习