Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot

Pilote automatique Amazon SageMaker permet aux organisations de créer et de déployer rapidement un modèle d'apprentissage automatique (ML) de bout en bout et un pipeline d'inférence avec seulement quelques lignes de code ou même sans aucun code du tout avec Amazon SageMakerStudio. Le pilote automatique décharge la lourde charge de la configuration de l'infrastructure et le temps nécessaire pour créer un pipeline complet, y compris l'ingénierie des fonctionnalités, la sélection des modèles et le réglage des hyperparamètres.

Dans cet article, nous montrons comment passer des données brutes à un pipeline d'inférence robuste et entièrement déployé avec Autopilot.

Vue d'ensemble de la solution

Nous utilisons Ensemble de données public de Lyft sur le partage de vélos pour cette simulation pour prédire si oui ou non un utilisateur participe à la Programme de vélos en libre-service pour tous. Il s'agit d'un simple problème de classification binaire.

Nous voulons montrer à quel point il est facile de créer un pipeline d'inférence automatisé et en temps réel pour classer les utilisateurs en fonction de leur participation au programme Bike Share for All. À cette fin, nous simulons un pipeline d'ingestion et d'inférence de données de bout en bout pour une entreprise imaginaire de vélos en libre-service opérant dans la région de la baie de San Francisco.

L'architecture se décompose en deux parties : le pipeline d'ingestion et le pipeline d'inférence.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Nous nous concentrons principalement sur le pipeline ML dans la première section de cet article et examinons le pipeline d'ingestion de données dans la deuxième partie.

Pré-requis

Pour suivre cet exemple, remplissez les conditions préalables suivantes :

  1. Créer une nouvelle instance de bloc-notes SageMaker.
  2. Créer un Firehose de données Amazon Kinesis flux de diffusion avec un AWS Lambda fonction de transformation. Pour obtenir des instructions, consultez Transformation des données Amazon Kinesis Firehose avec AWS Lambda. Cette étape est facultative et n'est nécessaire que pour simuler le flux de données.

Exploration de données

Téléchargeons et visualisons le jeu de données, qui se trouve dans un espace public Service de stockage simple Amazon (Amazon S3) compartiment et site Web statique :

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

La capture d'écran suivante montre un sous-ensemble des données avant transformation.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

La dernière colonne des données contient la cible que nous voulons prédire, qui est une variable binaire prenant une valeur Oui ou Non, indiquant si l'utilisateur participe au programme Bike Share for All.

Examinons la distribution de notre variable cible pour tout déséquilibre de données.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Comme le montre le graphique ci-dessus, les données sont déséquilibrées, avec moins de personnes participant au programme.

Nous devons équilibrer les données pour éviter un biais de surreprésentation. Cette étape est facultative car Autopilot propose également une approche interne pour gérer automatiquement le déséquilibre de classe, qui utilise par défaut une métrique de validation du score F1. De plus, si vous choisissez d'équilibrer les données vous-même, vous pouvez utiliser des techniques plus avancées pour gérer le déséquilibre des classes, telles que SMOTÉ or GAN.

Pour cet article, nous sous-échantillonnons la classe majoritaire (Non) comme technique d'équilibrage des données :

Le code suivant enrichit les données et sous-échantillonne la classe surreprésentée :

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Nous avons délibérément laissé nos caractéristiques catégorielles non codées, y compris notre valeur cible binaire. En effet, Autopilot s'occupe d'encoder et de décoder les données pour nous dans le cadre de l'ingénierie automatique des fonctionnalités et du déploiement du pipeline, comme nous le verrons dans la section suivante.

La capture d'écran suivante montre un échantillon de nos données.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Les données des graphiques suivants semblent par ailleurs normales, avec une distribution bimodale représentant les deux pics pour les heures de pointe du matin et les heures de pointe de l'après-midi, comme on peut s'y attendre. Nous observons également de faibles activités le week-end et la nuit.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Dans la section suivante, nous transmettons les données au pilote automatique afin qu'il puisse exécuter une expérience pour nous.

Construire un modèle de classification binaire

Le pilote automatique exige que nous spécifiions les compartiments de destination d'entrée et de sortie. Il utilise le compartiment d'entrée pour charger les données et le compartiment de sortie pour enregistrer les artefacts, tels que l'ingénierie des fonctionnalités et les blocs-notes Jupyter générés. Nous conservons 5 % de l'ensemble de données pour évaluer et valider les performances du modèle une fois la formation terminée et téléchargeons 95 % de l'ensemble de données dans le compartiment d'entrée S3. Voir le code suivant :

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Une fois que nous avons téléchargé les données vers la destination d'entrée, il est temps de démarrer le pilote automatique :

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Tout ce dont nous avons besoin pour commencer à expérimenter est d'appeler la méthode fit (). Le pilote automatique a besoin de l'emplacement S3 d'entrée et de sortie et de la colonne d'attribut cible comme paramètres requis. Après le traitement des fonctionnalités, le pilote automatique appelle Réglage automatique du modèle SageMaker pour trouver la meilleure version d'un modèle en exécutant de nombreuses tâches d'entraînement sur votre ensemble de données. Nous avons ajouté le paramètre facultatif max_candidates pour limiter le nombre de candidats à 30, soit le nombre de tâches d'entraînement qu'Autopilot lance avec différentes combinaisons d'algorithmes et d'hyperparamètres afin de trouver le meilleur modèle. Si vous ne spécifiez pas ce paramètre, sa valeur par défaut est 250.

On peut observer la progression de l'Autopilot avec le code suivant :

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

La formation prend un certain temps. Pendant qu'il est en cours d'exécution, examinons le flux de travail du pilote automatique.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Pour trouver le meilleur candidat, utilisez le code suivant :

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

La capture d'écran suivante montre notre sortie.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Notre modèle a atteint une précision de validation de 96 %, nous allons donc le déployer. Nous pourrions ajouter une condition telle que nous n'utilisions le modèle que si la précision est supérieure à un certain niveau.

Pipeline d'inférence

Avant de déployer notre modèle, examinons notre meilleur candidat et ce qui se passe dans notre pipeline. Voir le code suivant :

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

Le diagramme suivant montre notre sortie.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Autopilot a construit le modèle et l'a empaqueté dans trois conteneurs différents, chacun exécutant séquentiellement une tâche spécifique : transformer, prédire et transformer à l'envers. Cette inférence en plusieurs étapes est possible avec un Pipeline d'inférence SageMaker.

Une inférence en plusieurs étapes peut également enchaîner plusieurs modèles d'inférence. Par exemple, un conteneur peut effectuer analyse des composants principaux avant de transmettre les données au conteneur XGBoost.

Déployer le pipeline d'inférence sur un point de terminaison

Le processus de déploiement ne nécessite que quelques lignes de code :

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Configurons notre point de terminaison pour la prédiction avec un prédicteur :

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Maintenant que notre point de terminaison et notre prédicteur sont prêts, il est temps d'utiliser les données de test que nous avons mises de côté et de tester la précision de notre modèle. Nous commençons par définir une fonction utilitaire qui envoie les données une ligne à la fois à notre point de terminaison d'inférence et obtient une prédiction en retour. Parce que nous avons un XGBoost modèle, nous supprimons la variable cible avant d'envoyer la ligne CSV au point de terminaison. De plus, nous avons supprimé l'en-tête du CSV de test avant de parcourir le fichier, ce qui est également une autre exigence pour XGBoost sur SageMaker. Voir le code suivant :

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

La capture d'écran suivante montre notre sortie.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Calculons maintenant la précision de notre modèle.
Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.

Voir le code suivant:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Nous obtenons une précision de 92%. C'est légèrement inférieur aux 96% obtenus lors de l'étape de validation, mais c'est tout de même assez élevé. Nous ne nous attendons pas à ce que la précision soit exactement la même car le test est effectué avec un nouvel ensemble de données.

Ingestion de données

Nous avons téléchargé les données directement et les avons configurées pour la formation. Dans la vraie vie, vous devrez peut-être envoyer les données directement depuis l'appareil périphérique vers le lac de données et demander à SageMaker de les charger directement depuis le lac de données dans le bloc-notes.

Kinesis Data Firehose est une bonne option et le moyen le plus simple de charger de manière fiable des données en continu dans des lacs de données, des magasins de données et des outils d'analyse. Il peut capturer, transformer et charger des données de streaming dans Amazon S3 et d'autres magasins de données AWS.

Pour notre cas d'utilisation, nous créons un flux de diffusion Kinesis Data Firehose avec une fonction de transformation Lambda pour effectuer un nettoyage léger des données lorsqu'elles traversent le flux. Voir le code suivant :

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Cette fonction Lambda effectue une légère transformation des données diffusées depuis les appareils vers le lac de données. Il attend un fichier de données au format CSV.

Pour l'étape d'ingestion, nous téléchargeons les données et simulons un flux de données vers Kinesis Data Firehose avec une fonction de transformation Lambda et dans notre lac de données S3.

Simulons le streaming de quelques lignes :

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Nettoyer

Il est important de supprimer toutes les ressources utilisées dans cet exercice pour minimiser les coûts. Le code suivant supprime le point de terminaison d'inférence SageMaker que nous avons créé ainsi que les données d'entraînement et de test que nous avons téléchargées :

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Conclusion

Les ingénieurs ML, les scientifiques des données et les développeurs de logiciels peuvent utiliser Autopilot pour créer et déployer un pipeline d'inférence avec peu ou pas d'expérience en programmation ML. Le pilote automatique permet d'économiser du temps et des ressources, en utilisant la science des données et les meilleures pratiques de ML. Les grandes organisations peuvent désormais déplacer les ressources d'ingénierie de la configuration de l'infrastructure vers l'amélioration des modèles et la résolution des cas d'utilisation métier. Les startups et les petites organisations peuvent se lancer dans l'apprentissage automatique avec peu ou pas d'expertise en ML.

Pour démarrer avec SageMaker Autopilot, consultez le page produit ou accédez à SageMaker Autopilot dans SageMaker Studio.

Nous vous recommandons également d'en savoir plus sur les autres fonctionnalités importantes de SageMaker, telles que Magasin de fonctionnalités Amazon SageMaker, qui s'intègre à Pipelines Amazon SageMaker pour créer, ajouter la recherche et la découverte de fonctionnalités et réutiliser les workflows ML automatisés. Vous pouvez exécuter plusieurs simulations de pilote automatique avec différentes fonctionnalités ou variantes cibles dans votre ensemble de données. Vous pouvez également aborder cela comme un problème d'allocation dynamique de véhicules dans lequel votre modèle essaie de prédire la demande de véhicules en fonction du temps (comme l'heure de la journée ou le jour de la semaine) ou de l'emplacement, ou une combinaison des deux.


À propos des auteurs

Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Doug Mbaya est un architecte de solutions senior spécialisé dans les données et l'analyse. Doug travaille en étroite collaboration avec les partenaires AWS, les aidant à intégrer la solution de données et d'analyse dans le cloud. L'expérience antérieure de Doug comprend l'assistance aux clients AWS dans le segment du covoiturage et de la livraison de nourriture.

Automatisez un modèle de classification des vélos et scooters partagés avec Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Valério Perrone est un responsable des sciences appliquées travaillant sur Amazon SageMaker Automatic Model Tuning and Autopilot.

Horodatage:

Plus de Apprentissage automatique AWS