Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot

Piloto automático Amazon SageMaker hace posible que las organizaciones construyan e implementen rápidamente un modelo de aprendizaje automático (ML) de extremo a extremo y una canalización de inferencia con solo unas pocas líneas de código o incluso sin ningún código en absoluto con Estudio Amazon SageMaker. El piloto automático descarga el trabajo pesado de configurar la infraestructura y el tiempo que lleva construir una canalización completa, incluida la ingeniería de funciones, la selección de modelos y el ajuste de hiperparámetros.

En esta publicación, mostramos cómo pasar de datos sin procesar a una canalización de inferencia sólida y completamente implementada con Autopilot.

Resumen de la solución

Utilizamos Conjunto de datos públicos de Lyft sobre bicicletas compartidas para que esta simulación prediga si un usuario participa o no en el Programa de bicicletas compartidas para todos. Este es un problema simple de clasificación binaria.

Queremos mostrar lo fácil que es crear una tubería de inferencia automatizada y en tiempo real para clasificar a los usuarios en función de su participación en el programa Bike Share for All. Con este fin, simulamos una canalización de ingesta e inferencia de datos de extremo a extremo para una empresa imaginaria de bicicletas compartidas que opera en el Área de la Bahía de San Francisco.

La arquitectura se divide en dos partes: la canalización de ingestión y la canalización de inferencia.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Nos enfocamos principalmente en la canalización de ML en la primera sección de esta publicación y revisamos la canalización de ingesta de datos en la segunda parte.

Requisitos previos

Para continuar con este ejemplo, complete los siguientes requisitos previos:

  1. Crear una nueva instancia de bloc de notas de SageMaker.
  2. Crear una Manguera de bomberos de datos de Amazon Kinesis flujo de entrega con un AWS Lambda función de transformación. Para obtener instrucciones, consulte Amazon Kinesis Firehose Transformación de datos con AWS Lambda. Este paso es opcional y solo es necesario para simular la transmisión de datos.

Exploración de datos

Descarguemos y visualicemos el conjunto de datos, que se encuentra en un lugar público. Servicio de almacenamiento simple de Amazon (Amazon S3) depósito y sitio web estático:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

La siguiente captura de pantalla muestra un subconjunto de los datos antes de la transformación.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

La última columna de datos contiene el objetivo que queremos predecir, que es una variable binaria que toma un valor de Sí o No, que indica si el usuario participa en el programa Bike Share for All.

Echemos un vistazo a la distribución de nuestra variable objetivo para cualquier desequilibrio de datos.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Como se muestra en el gráfico anterior, los datos están desequilibrados, con menos personas participando en el programa.

Necesitamos equilibrar los datos para evitar un sesgo de representación excesiva. Este paso es opcional porque Autopilot también ofrece un enfoque interno para manejar el desequilibrio de clase automáticamente, que por defecto es una métrica de validación de puntuación F1. Además, si elige equilibrar los datos usted mismo, puede usar técnicas más avanzadas para manejar el desequilibrio de clases, como HERIDO or GAN.

Para esta publicación, reducimos la muestra de la clase mayoritaria (No) como técnica de balanceo de datos:

El siguiente código enriquece los datos y submuestrea la clase sobrerrepresentada:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Dejamos deliberadamente nuestras características categóricas sin codificar, incluido nuestro valor objetivo binario. Esto se debe a que Autopilot se encarga de codificar y decodificar los datos por nosotros como parte de la implementación automática de ingeniería de funciones y canalización, como veremos en la siguiente sección.

La siguiente captura de pantalla muestra una muestra de nuestros datos.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Los datos en los siguientes gráficos parecen normales, con una distribución bimodal que representa los dos picos para las horas de la mañana y las horas pico de la tarde, como era de esperar. También observamos actividades bajas los fines de semana y por la noche.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

En la siguiente sección, alimentamos los datos a Autopilot para que pueda ejecutar un experimento por nosotros.

Construir un modelo de clasificación binaria

El piloto automático requiere que especifiquemos los cubos de destino de entrada y salida. Utiliza el depósito de entrada para cargar los datos y el depósito de salida para guardar los artefactos, como la ingeniería de características y los cuadernos Jupyter generados. Retenemos el 5 % del conjunto de datos para evaluar y validar el rendimiento del modelo después de completar el entrenamiento y cargamos el 95 % del conjunto de datos en el depósito de entrada de S3. Ver el siguiente código:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Después de cargar los datos en el destino de entrada, es hora de iniciar Autopilot:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Todo lo que necesitamos para comenzar a experimentar es llamar al método fit(). El piloto automático necesita la ubicación S3 de entrada y salida y la columna de atributos de destino como parámetros obligatorios. Después del procesamiento de características, las llamadas de Autopilot Ajuste automático del modelo SageMaker para encontrar la mejor versión de un modelo ejecutando muchos trabajos de entrenamiento en su conjunto de datos. Agregamos el parámetro opcional max_candidates para limitar la cantidad de candidatos a 30, que es la cantidad de trabajos de capacitación que lanza Autopilot con diferentes combinaciones de algoritmos e hiperparámetros para encontrar el mejor modelo. Si no especifica este parámetro, el valor predeterminado es 250.

Podemos observar el progreso de Autopilot con el siguiente código:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

El entrenamiento tarda algún tiempo en completarse. Mientras se ejecuta, veamos el flujo de trabajo de Autopilot.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Para encontrar el mejor candidato, utilice el siguiente código:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

La siguiente captura de pantalla muestra nuestro resultado.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Nuestro modelo logró una precisión de validación del 96 %, por lo que vamos a implementarlo. Podríamos agregar una condición de modo que solo usemos el modelo si la precisión está por encima de cierto nivel.

Canalización de inferencia

Antes de implementar nuestro modelo, examinemos nuestro mejor candidato y lo que está sucediendo en nuestra canalización. Ver el siguiente código:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

El siguiente diagrama muestra nuestra salida.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Autopilot ha creado el modelo y lo ha empaquetado en tres contenedores diferentes, cada uno de los cuales ejecuta secuencialmente una tarea específica: transformación, predicción y transformación inversa. Esta inferencia de varios pasos es posible con un Canalización de inferencia de SageMaker.

Una inferencia de varios pasos también puede encadenar varios modelos de inferencia. Por ejemplo, un contenedor puede realizar análisis de componentes principales antes de pasar los datos al contenedor XGBoost.

Implementar la canalización de inferencia en un punto final

El proceso de implementación implica solo unas pocas líneas de código:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Configuremos nuestro punto final para la predicción con un predictor:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Ahora que tenemos nuestro punto final y predictor listos, es hora de usar los datos de prueba que reservamos y probar la precisión de nuestro modelo. Comenzamos definiendo una función de utilidad que envía los datos una línea a la vez a nuestro punto final de inferencia y obtiene una predicción a cambio. porque tenemos un XGBoost modelo, soltamos la variable de destino antes de enviar la línea CSV al punto final. Además, eliminamos el encabezado del CSV de prueba antes de recorrer el archivo, que también es otro requisito para XGBoost en SageMaker. Ver el siguiente código:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

La siguiente captura de pantalla muestra nuestro resultado.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Ahora calculemos la precisión de nuestro modelo.
Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.

Ver el siguiente código:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Obtenemos una precisión del 92%. Esto es ligeramente inferior al 96 % obtenido durante el paso de validación, pero sigue siendo lo suficientemente alto. No esperamos que la precisión sea exactamente la misma porque la prueba se realiza con un nuevo conjunto de datos.

Ingestión de datos

Descargamos los datos directamente y los configuramos para el entrenamiento. En la vida real, es posible que deba enviar los datos directamente desde el dispositivo perimetral al lago de datos y hacer que SageMaker los cargue directamente desde el lago de datos a la computadora portátil.

Kinesis Data Firehose es una buena opción y la forma más sencilla de cargar de manera confiable datos de transmisión en lagos de datos, almacenes de datos y herramientas de análisis. Puede capturar, transformar y cargar datos de transmisión en Amazon S3 y otros almacenes de datos de AWS.

Para nuestro caso de uso, creamos un flujo de entrega de Kinesis Data Firehose con una función de transformación de Lambda para realizar una limpieza ligera de datos a medida que atraviesa el flujo. Ver el siguiente código:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Esta función Lambda realiza una transformación ligera de los datos transmitidos desde los dispositivos al lago de datos. Espera un archivo de datos con formato CSV.

Para el paso de ingestión, descargamos los datos y simulamos un flujo de datos en Kinesis Data Firehose con una función de transformación Lambda y en nuestro lago de datos S3.

Simulemos la transmisión de unas pocas líneas:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Limpiar

Es importante eliminar todos los recursos utilizados en este ejercicio para minimizar el costo. El siguiente código elimina el extremo de inferencia de SageMaker que creamos, así como los datos de capacitación y prueba que cargamos:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Conclusión

Los ingenieros de ML, los científicos de datos y los desarrolladores de software pueden usar Autopilot para crear e implementar una canalización de inferencia con poca o ninguna experiencia en programación de ML. El piloto automático ahorra tiempo y recursos, utilizando la ciencia de datos y las mejores prácticas de ML. Las grandes organizaciones ahora pueden cambiar los recursos de ingeniería de la configuración de la infraestructura hacia la mejora de los modelos y la resolución de casos de uso comercial. Las empresas emergentes y las organizaciones más pequeñas pueden comenzar con el aprendizaje automático con poca o ninguna experiencia en ML.

Para comenzar con SageMaker Autopilot, consulte la la página del producto o acceda a SageMaker Autopilot dentro de SageMaker Studio.

También recomendamos obtener más información sobre otras características importantes que SageMaker tiene para ofrecer, como el Tienda de funciones de Amazon SageMaker, que se integra con Canalizaciones de Amazon SageMaker para crear, agregar funciones de búsqueda y descubrimiento, y reutilizar flujos de trabajo de ML automatizados. Puede ejecutar múltiples simulaciones de Autopilot con diferentes características o variantes de objetivos en su conjunto de datos. También podría abordar esto como un problema de asignación dinámica de vehículos en el que su modelo intenta predecir la demanda de vehículos en función del tiempo (como la hora del día o el día de la semana) o la ubicación, o una combinación de ambos.


Acerca de los autores

Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.doug mbaya es un arquitecto de soluciones sénior con un enfoque en datos y análisis. Doug trabaja en estrecha colaboración con los socios de AWS, ayudándolos a integrar la solución de análisis y datos en la nube. La experiencia previa de Doug incluye el apoyo a los clientes de AWS en el segmento de viajes compartidos y entrega de alimentos.

Automatice un modelo de clasificación de bicicletas y scooters compartidos con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.Valerio Perrone es un administrador de ciencias aplicadas que trabaja en Amazon SageMaker Automatic Model Tuning y Autopilot.

Sello de tiempo:

Mas de Aprendizaje automático de AWS