Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Automatize um modelo de classificação de bicicletas e patinetes compartilhado com o Amazon SageMaker Autopilot

Piloto automático do Amazon SageMaker possibilita que as organizações criem e implantem rapidamente um modelo de aprendizado de máquina (ML) ponta a ponta e um pipeline de inferência com apenas algumas linhas de código ou até mesmo sem nenhum código em tudo com Estúdio Amazon SageMaker. O Autopilot alivia o trabalho pesado de configuração da infraestrutura e o tempo necessário para construir um pipeline inteiro, incluindo engenharia de recursos, seleção de modelo e ajuste de hiperparâmetros.

Nesta postagem, mostramos como passar de dados brutos para um pipeline de inferência robusto e totalmente implantado com o Autopilot.

Visão geral da solução

Usamos Conjunto de dados públicos da Lyft sobre compartilhamento de bicicletas para esta simulação prever se um usuário participa ou não do Programa Bicicleta Compartilhada para Todos. Este é um problema simples de classificação binária.

Queremos mostrar como é fácil construir um pipeline de inferência automatizado e em tempo real para classificar usuários com base em sua participação no programa Bike Share for All. Para esse fim, simulamos um pipeline de ingestão e inferência de dados ponta a ponta para uma empresa imaginária de compartilhamento de bicicletas operando na área da Baía de São Francisco.

A arquitetura é dividida em duas partes: o pipeline de ingestão e o pipeline de inferência.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Nós nos concentramos principalmente no pipeline de ML na primeira seção desta postagem e revisamos o pipeline de ingestão de dados na segunda parte.

Pré-requisitos

Para acompanhar este exemplo, preencha os seguintes pré-requisitos:

  1. Crie uma nova instância de notebook SageMaker.
  2. Crie uma Mangueira de incêndio de dados do Amazon Kinesis fluxo de entrega com um AWS Lambda função de transformação. Para obter instruções, consulte Transformação de dados do Amazon Kinesis Firehose com AWS Lambda. Esta etapa é opcional e necessária apenas para simular o streaming de dados.

Exploração de dados

Vamos baixar e visualizar o conjunto de dados, que está localizado em um local público Serviço de armazenamento simples da Amazon (Amazon S3) bucket e site estático:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

A captura de tela a seguir mostra um subconjunto de dados antes da transformação.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

A última coluna dos dados contém o alvo que queremos prever, que é uma variável binária que assume um valor Sim ou Não, indicando se o usuário participa do programa Bike Share for All.

Vamos dar uma olhada na distribuição de nossa variável alvo para qualquer desequilíbrio de dados.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Conforme mostrado no gráfico acima, os dados estão desequilibrados, com menos pessoas participando do programa.

Precisamos equilibrar os dados para evitar um viés de representação excessiva. Esta etapa é opcional porque o Autopilot também oferece uma abordagem interna para lidar automaticamente com o desequilíbrio de classe, cujo padrão é uma métrica de validação de pontuação F1. Além disso, se você mesmo optar por balancear os dados, poderá usar técnicas mais avançadas para lidar com o desequilíbrio de classes, como FERIR or GAN.

Para esta postagem, reduzimos a resolução da classe majoritária (Não) como uma técnica de balanceamento de dados:

O código a seguir enriquece os dados e subamostra a classe sobre-representada:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Deixamos deliberadamente nossos recursos categóricos não codificados, incluindo nosso valor alvo binário. Isso ocorre porque o Autopilot cuida da codificação e decodificação dos dados para nós como parte da engenharia automática de recursos e da implantação do pipeline, como veremos na próxima seção.

A captura de tela a seguir mostra uma amostra de nossos dados.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Os dados nos gráficos a seguir parecem normais, com uma distribuição bimodal representando os dois picos para as horas da manhã e as horas de ponta da tarde, como seria de esperar. Também observamos baixa atividade nos finais de semana e à noite.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Na próxima seção, alimentamos os dados no Autopilot para que ele possa realizar um experimento para nós.

Construa um modelo de classificação binária

O piloto automático exige que especifiquemos os buckets de destino de entrada e saída. Ele usa o bucket de entrada para carregar os dados e o bucket de saída para salvar os artefatos, como engenharia de recursos e os notebooks Jupyter gerados. Retemos 5% do conjunto de dados para avaliar e validar o desempenho do modelo após a conclusão do treinamento e carregamos 95% do conjunto de dados para o intervalo de entrada S3. Veja o seguinte código:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Depois de carregarmos os dados para o destino de entrada, é hora de iniciar o Autopilot:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Tudo o que precisamos para começar a experimentar é chamar o método fit(). O Autopilot precisa do local S3 de entrada e saída e da coluna de atributos de destino como parâmetros necessários. Após o processamento do recurso, o Autopilot chama SageMaker ajuste automático do modelo para encontrar a melhor versão de um modelo executando vários trabalhos de treinamento em seu conjunto de dados. Adicionamos o parâmetro opcional max_candidates para limitar o número de candidatos a 30, que é o número de trabalhos de treinamento que o Autopilot lança com diferentes combinações de algoritmos e hiperparâmetros para encontrar o melhor modelo. Se você não especificar esse parâmetro, o padrão será 250.

Podemos observar o progresso do Autopilot com o seguinte código:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

O treinamento leva algum tempo para ser concluído. Enquanto estiver em execução, vamos dar uma olhada no fluxo de trabalho do Autopilot.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Para encontrar o melhor candidato, use o seguinte código:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

A captura de tela a seguir mostra nossa saída.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Nosso modelo alcançou uma precisão de validação de 96%, então vamos implantá-lo. Poderíamos adicionar uma condição tal que só usaríamos o modelo se a precisão estivesse acima de um determinado nível.

Pipeline de inferência

Antes de implantarmos nosso modelo, vamos examinar nosso melhor candidato e o que está acontecendo em nosso pipeline. Veja o seguinte código:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

O diagrama a seguir mostra nossa saída.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

O Autopilot construiu o modelo e o empacotou em três contêineres diferentes, cada um executando sequencialmente uma tarefa específica: transformar, prever e transformar reversamente. Esta inferência em vários passos é possível com um Pipeline de inferência do SageMaker.

Uma inferência em várias etapas também pode encadear vários modelos de inferência. Por exemplo, um contêiner pode executar análise do componente principal antes de passar os dados para o contêiner XGBoost.

Implantar o pipeline de inferência em um endpoint

O processo de implantação envolve apenas algumas linhas de código:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Vamos configurar nosso endpoint para previsão com um preditor:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Agora que temos nosso endpoint e preditor prontos, é hora de usar os dados de teste que reservamos e testar a precisão do nosso modelo. Começamos definindo uma função de utilidade que envia os dados uma linha por vez para nosso ponto final de inferência e obtém uma previsão em troca. Porque temos um XGBoostName modelo, eliminamos a variável de destino antes de enviar a linha CSV para o endpoint. Além disso, removemos o cabeçalho do CSV de teste antes de percorrer o arquivo, o que também é outro requisito para o XGBoost no SageMaker. Veja o seguinte código:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

A captura de tela a seguir mostra nossa saída.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Agora vamos calcular a precisão do nosso modelo.
Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.

Veja o seguinte código:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Obtemos uma precisão de 92%. Isto é um pouco inferior aos 96% obtidos durante a etapa de validação, mas ainda é alto o suficiente. Não esperamos que a precisão seja exatamente a mesma porque o teste é realizado com um novo conjunto de dados.

Ingestão de dados

Baixamos os dados diretamente e os configuramos para treinamento. Na vida real, pode ser necessário enviar os dados diretamente do dispositivo de borda para o data lake e fazer com que o SageMaker os carregue diretamente do data lake para o notebook.

O Kinesis Data Firehose é uma boa opção e a maneira mais direta de carregar dados de streaming de maneira confiável em data lakes, armazenamentos de dados e ferramentas de análise. Ele pode capturar, transformar e carregar dados de streaming no Amazon S3 e em outros armazenamentos de dados da AWS.

Para nosso caso de uso, criamos um fluxo de entrega do Kinesis Data Firehose com uma função de transformação Lambda para fazer uma limpeza leve de dados à medida que eles atravessam o fluxo. Veja o seguinte código:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Esta função Lambda realiza uma leve transformação dos dados transmitidos dos dispositivos para o data lake. Ele espera um arquivo de dados formatado em CSV.

Para a etapa de ingestão, baixamos os dados e simulamos um fluxo de dados para o Kinesis Data Firehose com uma função de transformação Lambda e para nosso data lake S3.

Vamos simular o streaming de algumas linhas:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

limpar

É importante excluir todos os recursos usados ​​neste exercício para minimizar custos. O código a seguir exclui o endpoint de inferência do SageMaker que criamos, bem como os dados de treinamento e teste que carregamos:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Conclusão

Engenheiros de ML, cientistas de dados e desenvolvedores de software podem usar o Autopilot para criar e implantar um pipeline de inferência com pouca ou nenhuma experiência em programação de ML. O Autopilot economiza tempo e recursos, usando ciência de dados e práticas recomendadas de ML. As grandes organizações podem agora transferir recursos de engenharia da configuração da infraestrutura para melhorar modelos e resolver casos de uso de negócios. Startups e organizações menores podem começar a usar o aprendizado de máquina com pouco ou nenhum conhecimento em ML.

Para começar a usar o SageMaker Autopilot, consulte o página do produto FlexSim ou acesse o SageMaker Autopilot no SageMaker Studio.

Também recomendamos aprender mais sobre outros recursos importantes que o SageMaker tem a oferecer, como o Loja de recursos Amazon SageMaker, que se integra com Pipelines Amazon SageMaker para criar, adicionar pesquisa e descoberta de recursos e reutilizar fluxos de trabalho automatizados de ML. Você pode executar várias simulações do Autopilot com diferentes recursos ou variantes de destino em seu conjunto de dados. Você também pode abordar isso como um problema dinâmico de alocação de veículos, no qual seu modelo tenta prever a demanda de veículos com base no tempo (como hora do dia ou dia da semana) ou localização, ou uma combinação de ambos.


Sobre os autores

Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Doug Mbaya é arquiteto de soluções sênior com foco em dados e análises. Doug trabalha em estreita colaboração com parceiros da AWS, ajudando-os a integrar soluções de dados e análises na nuvem. A experiência anterior de Doug inclui suporte a clientes da AWS no segmento de compartilhamento de viagens e entrega de comida.

Automatize um modelo de classificação compartilhado de bicicletas e scooters com Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Valério Perrone é um gerente de ciência aplicada que trabalha no ajuste automático de modelos e no piloto automático do Amazon SageMaker.

Carimbo de hora:

Mais de Aprendizado de máquina da AWS