Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Automatizza un modello di classificazione di biciclette e scooter condivisi con Amazon SageMaker Autopilot

Pilota automatico Amazon SageMaker consente alle organizzazioni di creare e distribuire rapidamente un modello di machine learning (ML) end-to-end e una pipeline di inferenza con poche righe di codice o addirittura senza alcun codice affatto con Amazon Sage Maker Studio. Autopilot allevia il lavoro pesante della configurazione dell'infrastruttura e il tempo necessario per costruire un'intera pipeline, inclusa l'ingegneria delle funzionalità, la selezione del modello e l'ottimizzazione degli iperparametri.

In questo post, mostriamo come passare dai dati grezzi a una pipeline di inferenza solida e completamente distribuita con Autopilot.

Panoramica della soluzione

Usiamo Il set di dati pubblico di Lyft sul bike sharing per questa simulazione per prevedere se un utente partecipa o meno al Programma Bike Share for All. Questo è un semplice problema di classificazione binaria.

Vogliamo mostrare quanto sia facile creare una pipeline di inferenza automatizzata e in tempo reale per classificare gli utenti in base alla loro partecipazione al programma Bike Share for All. A tal fine, simuliamo una pipeline di acquisizione e inferenza di dati end-to-end per una società di bike sharing immaginaria che opera nella Bay Area di San Francisco.

L'architettura è suddivisa in due parti: la pipeline di importazione e la pipeline di inferenza.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Ci concentriamo principalmente sulla pipeline ML nella prima sezione di questo post e esaminiamo la pipeline di importazione dei dati nella seconda parte.

Prerequisiti

Per seguire questo esempio, completare i seguenti prerequisiti:

  1. Crea una nuova istanza notebook SageMaker.
  2. Creare un Firehose dati Amazon Kinesis flusso di consegna con un AWS Lambda funzione di trasformazione. Per istruzioni, vedere Trasformazione dei dati Amazon Kinesis Firehose con AWS Lambda. Questo passaggio è facoltativo e necessario solo per simulare lo streaming di dati.

Esplorazione dei dati

Scarichiamo e visualizziamo il dataset, che si trova in un public Servizio di archiviazione semplice Amazon Bucket (Amazon S3) e sito Web statico:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

La schermata seguente mostra un sottoinsieme dei dati prima della trasformazione.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

L'ultima colonna dei dati contiene il target che vogliamo prevedere, che è una variabile binaria che assume un valore Sì o No, indicando se l'utente partecipa al programma Bike Share for All.

Diamo un'occhiata alla distribuzione della nostra variabile target per qualsiasi squilibrio di dati.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Come mostrato nel grafico sopra, i dati sono sbilanciati, con un minor numero di persone che partecipano al programma.

Abbiamo bisogno di bilanciare i dati per evitare un pregiudizio di rappresentazione eccessiva. Questo passaggio è facoltativo perché Autopilot offre anche un approccio interno per gestire automaticamente lo squilibrio di classe, che per impostazione predefinita è una metrica di convalida del punteggio F1. Inoltre, se scegli di bilanciare i dati da solo, puoi utilizzare tecniche più avanzate per gestire lo squilibrio di classe, ad esempio percossero or GAN.

Per questo post, eseguiamo il downsampling della classe di maggioranza (No) come tecnica di bilanciamento dei dati:

Il codice seguente arricchisce i dati e sottocampiona la classe sovrarappresentata:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Abbiamo deliberatamente lasciato le nostre caratteristiche categoriali non codificate, incluso il nostro valore target binario. Questo perché Autopilot si occupa della codifica e della decodifica dei dati per noi come parte dell'ingegneria automatica delle funzionalità e della distribuzione della pipeline, come vedremo nella sezione successiva.

Lo screenshot seguente mostra un campione dei nostri dati.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

I dati nei grafici seguenti sembrano per il resto normali, con una distribuzione bimodale che rappresenta i due picchi per le ore mattutine e le ore di punta pomeridiane, come ci si aspetterebbe. Osserviamo anche attività basse nei fine settimana e di notte.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Nella sezione successiva, inseriamo i dati in Autopilot in modo che possa eseguire un esperimento per noi.

Costruisci un modello di classificazione binaria

Il pilota automatico richiede di specificare i bucket di destinazione di input e output. Utilizza il bucket di input per caricare i dati e il bucket di output per salvare gli artefatti, come l'ingegneria delle funzionalità e i notebook Jupyter generati. Conserviamo il 5% del set di dati per valutare e convalidare le prestazioni del modello al termine dell'addestramento e caricare il 95% del set di dati nel bucket di input S3. Vedere il codice seguente:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Dopo aver caricato i dati nella destinazione di input, è il momento di avviare Autopilot:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Tutto ciò di cui abbiamo bisogno per iniziare a sperimentare è chiamare il metodo fit(). Autopilot necessita della posizione di input e output S3 e della colonna dell'attributo di destinazione come parametri richiesti. Dopo l'elaborazione della funzione, Autopilot chiama Ottimizzazione automatica del modello di SageMaker per trovare la versione migliore di un modello eseguendo molti processi di addestramento sul set di dati. Abbiamo aggiunto il parametro facoltativo max_candidates per limitare il numero di candidati a 30, che è il numero di lavori di formazione che Autopilot avvia con diverse combinazioni di algoritmi e iperparametri per trovare il modello migliore. Se non si specifica questo parametro, il valore predefinito è 250.

Possiamo osservare l'andamento di Autopilot con il seguente codice:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

La formazione richiede del tempo per essere completata. Mentre è in esecuzione, diamo un'occhiata al flusso di lavoro di Autopilot.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Per trovare il miglior candidato, utilizzare il seguente codice:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

Lo screenshot seguente mostra il nostro output.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Il nostro modello ha raggiunto un'accuratezza di convalida del 96%, quindi lo implementeremo. Potremmo aggiungere una condizione tale da utilizzare il modello solo se la precisione è superiore a un certo livello.

Conduttura di inferenza

Prima di distribuire il nostro modello, esaminiamo il nostro miglior candidato e cosa sta accadendo nella nostra pipeline. Vedere il codice seguente:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

Il diagramma seguente mostra il nostro output.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Autopilot ha creato il modello e lo ha impacchettato in tre diversi contenitori, ognuno dei quali esegue in sequenza un'attività specifica: trasformazione, previsione e trasformazione inversa. Questa inferenza a più passaggi è possibile con a Pipeline di inferenza SageMaker.

Un'inferenza a più passaggi può anche concatenare più modelli di inferenza. Ad esempio, un container può funzionare analisi dei componenti principali prima di passare i dati al contenitore XGBoost.

Distribuire la pipeline di inferenza su un endpoint

Il processo di distribuzione prevede solo poche righe di codice:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Configuriamo il nostro endpoint per la previsione con un predittore:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Ora che abbiamo l'endpoint e il predittore pronti, è il momento di utilizzare i dati di test che abbiamo messo da parte e testare l'accuratezza del nostro modello. Iniziamo definendo una funzione di utilità che invia i dati una riga alla volta al nostro endpoint di inferenza e ottiene in cambio una previsione. Perché abbiamo un XGBoost modello, eliminiamo la variabile di destinazione prima di inviare la riga CSV all'endpoint. Inoltre, abbiamo rimosso l'intestazione dal CSV di test prima di eseguire il loop del file, che è anche un altro requisito per XGBoost su SageMaker. Vedere il codice seguente:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

Lo screenshot seguente mostra il nostro output.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Ora calcoliamo la precisione del nostro modello.
Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.

Vedi il seguente codice:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Otteniamo una precisione del 92%. Questo è leggermente inferiore al 96% ottenuto durante la fase di convalida, ma è comunque abbastanza alto. Non ci aspettiamo che la precisione sia esattamente la stessa perché il test viene eseguito con un nuovo set di dati.

Importazione dei dati

Abbiamo scaricato direttamente i dati e li abbiamo configurati per l'allenamento. Nella vita reale, potrebbe essere necessario inviare i dati direttamente dal dispositivo perimetrale al data lake e fare in modo che SageMaker li carichi direttamente dal data lake nel notebook.

Kinesis Data Firehose è una buona opzione e il modo più semplice per caricare in modo affidabile i dati in streaming in data lake, data store e strumenti di analisi. Può acquisire, trasformare e caricare dati in streaming in Amazon S3 e altri datastore AWS.

Per il nostro caso d'uso, creiamo un flusso di distribuzione Kinesis Data Firehose con una funzione di trasformazione Lambda per eseguire una leggera pulizia dei dati mentre attraversa il flusso. Vedere il codice seguente:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Questa funzione Lambda esegue una leggera trasformazione dei dati trasmessi dai dispositivi al data lake. Si aspetta un file di dati in formato CSV.

Per la fase di importazione, scarichiamo i dati e simuliamo un flusso di dati in Kinesis Data Firehose con una funzione di trasformazione Lambda e nel nostro data lake S3.

Simuliamo lo streaming di alcune righe:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

ripulire

È importante eliminare tutte le risorse utilizzate in questo esercizio per ridurre al minimo i costi. Il codice seguente elimina l'endpoint di inferenza SageMaker che abbiamo creato e i dati di addestramento e test che abbiamo caricato:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Conclusione

Gli ingegneri di ML, i data scientist e gli sviluppatori di software possono utilizzare Autopilot per creare e distribuire una pipeline di inferenza con poca o nessuna esperienza di programmazione ML. Autopilot consente di risparmiare tempo e risorse, utilizzando le best practice di data science e ML. Le grandi organizzazioni possono ora spostare le risorse di progettazione dalla configurazione dell'infrastruttura al miglioramento dei modelli e alla risoluzione dei casi d'uso aziendali. Le startup e le organizzazioni più piccole possono iniziare a utilizzare l'apprendimento automatico con poca o nessuna esperienza di machine learning.

Per iniziare con SageMaker Autopilot, vedere il   o accedi a SageMaker Autopilot all'interno di SageMaker Studio.

Ti consigliamo inoltre di saperne di più su altre importanti funzionalità che SageMaker ha da offrire, come il Negozio di funzionalità Amazon SageMaker, che si integra con Pipeline di Amazon SageMaker per creare, aggiungere funzionalità di ricerca e individuazione e riutilizzare flussi di lavoro automatizzati di ML. Puoi eseguire più simulazioni di Autopilot con diverse caratteristiche o varianti di destinazione nel tuo set di dati. Potresti anche affrontare questo problema come un problema di allocazione dinamica del veicolo in cui il tuo modello cerca di prevedere la domanda del veicolo in base all'ora (come l'ora del giorno o il giorno della settimana) o alla posizione, o una combinazione di entrambi.


Informazioni sugli autori

Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Doug Mbaya è un architetto di soluzioni senior con particolare attenzione ai dati e all'analisi. Doug lavora a stretto contatto con i partner AWS, aiutandoli a integrare dati e soluzioni di analisi nel cloud. L'esperienza precedente di Doug include il supporto ai clienti AWS nel segmento del ride sharing e della consegna di cibo.

Automatizza un modello di classificazione condiviso di biciclette e scooter con Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Valerio Perrone è un Applied Science Manager che lavora su Amazon SageMaker Automatic Model Tuning e Autopilot.

Timestamp:

Di più da Apprendimento automatico di AWS