Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot

Amazon SageMaker Autopilot gør det muligt for organisationer hurtigt at bygge og implementere en end-to-end machine learning (ML) model og inferens pipeline med blot nogle få linjer kode eller endda uden nogen kode overhovedet med Amazon SageMaker Studio. Autopilot aflaster det tunge løft af konfiguration af infrastruktur og den tid, det tager at bygge en hel pipeline, inklusive funktionsteknik, modelvalg og hyperparameterjustering.

I dette indlæg viser vi, hvordan man går fra rådata til en robust og fuldt implementeret inferenspipeline med Autopilot.

Løsningsoversigt

Vi anvender Lyfts offentlige datasæt om cykeldeling for denne simulering at forudsige, om en bruger deltager i Bike Share for All-program. Dette er et simpelt binært klassifikationsproblem.

Vi ønsker at vise, hvor nemt det er at bygge en automatiseret og realtids-inferenspipeline for at klassificere brugere baseret på deres deltagelse i Bike Share for All-programmet. Til dette formål simulerer vi en end-to-end dataindtagelse og inferenspipeline for et imaginært bikeshare-selskab, der opererer i San Francisco Bay Area.

Arkitekturen er opdelt i to dele: indtagelsespipeline og inferenspipeline.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Vi fokuserer primært på ML-pipelinen i første afsnit af dette indlæg, og gennemgår dataindtagelsespipelinen i anden del.

Forudsætninger

For at følge dette eksempel skal du udfylde følgende forudsætninger:

  1. Opret en ny SageMaker notebook-instans.
  2. Opret en Amazon Kinesis Data Firehose leveringsstrøm med en AWS Lambda transformationsfunktion. For instruktioner, se Amazon Kinesis Firehose Data Transformation med AWS Lambda. Dette trin er valgfrit og kun nødvendigt for at simulere datastreaming.

Data efterforskning

Lad os downloade og visualisere datasættet, som er placeret i en offentlig Amazon Simple Storage Service (Amazon S3) spand og statisk websted:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

Følgende skærmbillede viser en delmængde af dataene før transformation.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Den sidste kolonne af dataene indeholder det mål, vi ønsker at forudsige, som er en binær variabel, der tager enten en Ja- eller Nej-værdi, der angiver, om brugeren deltager i Bike Share for All-programmet.

Lad os tage et kig på fordelingen af ​​vores målvariabel for enhver dataubalance.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Som vist i grafen ovenfor, er dataene ubalancerede, idet færre personer deltager i programmet.

Vi er nødt til at balancere dataene for at forhindre en overrepræsentationsbias. Dette trin er valgfrit, fordi Autopilot også tilbyder en intern tilgang til automatisk at håndtere klasseubalance, som som standard er en F1-scorevalideringsmetrik. Vælger du derudover selv at balancere dataene, kan du bruge mere avancerede teknikker til at håndtere klasseubalance, som f.eks. SMOTE or GAN.

Til dette indlæg nedsampler vi majoritetsklassen (Nej) som en databalanceringsteknik:

Følgende kode beriger dataene og undersampler den overrepræsenterede klasse:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Vi efterlod bevidst vores kategoriske funktioner ikke kodet, inklusive vores binære målværdi. Dette skyldes, at Autopilot sørger for at indkode og afkode dataene for os som en del af den automatiske funktionsudvikling og pipeline-implementering, som vi ser i næste afsnit.

Følgende skærmbillede viser et eksempel på vores data.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Dataene i de følgende grafer ser ellers normale ud, med en bimodal fordeling, der repræsenterer de to toppe for morgentimerne og eftermiddagsmyldretiderne, som du ville forvente. Vi observerer også lave aktiviteter i weekender og om natten.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

I næste afsnit feeder vi dataene til Autopilot, så den kan køre et eksperiment for os.

Byg en binær klassifikationsmodel

Autopilot kræver, at vi specificerer input- og outputdestinationsbøtter. Den bruger input-bøtten til at indlæse dataene og output-bøtten til at gemme artefakterne, såsom feature engineering og de genererede Jupyter-notebooks. Vi beholder 5 % af datasættet for at evaluere og validere modellens ydeevne, efter at træningen er afsluttet, og uploader 95 % af datasættet til S3 input-bøtten. Se følgende kode:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Når vi har uploadet dataene til inputdestinationen, er det tid til at starte Autopilot:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Det eneste, vi behøver for at begynde at eksperimentere, er at kalde fit()-metoden. Autopiloten har brug for input og output S3-placeringen og målattributkolonnen som de nødvendige parametre. Efter funktionsbehandling ringer Autopilot SageMaker automatisk model tuning at finde den bedste version af en model ved at køre mange træningsjob på dit datasæt. Vi tilføjede den valgfrie max_candidates parameter for at begrænse antallet af kandidater til 30, hvilket er antallet af træningsjob, som Autopilot lancerer med forskellige kombinationer af algoritmer og hyperparametre for at finde den bedste model. Hvis du ikke angiver denne parameter, er den som standard 250.

Vi kan observere autopilotens fremskridt med følgende kode:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

Uddannelsen tager noget tid at gennemføre. Mens den kører, lad os se på autopilotens arbejdsgang.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

For at finde den bedste kandidat, brug følgende kode:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

Følgende skærmbillede viser vores output.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Vores model opnåede en valideringsnøjagtighed på 96 %, så vi vil implementere den. Vi kunne tilføje en betingelse, således at vi kun bruger modellen, hvis nøjagtigheden er over et vist niveau.

Inferens pipeline

Før vi implementerer vores model, lad os undersøge vores bedste kandidat, og hvad der sker i vores pipeline. Se følgende kode:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

Følgende diagram viser vores output.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Autopilot har bygget modellen og har pakket den i tre forskellige beholdere, der hver sekventielt kører en bestemt opgave: transformere, forudsige og omvendt transformere. Denne multi-trin inferens er mulig med en SageMaker inferens pipeline.

En multi-trin inferens kan også kæde flere inferensmodeller. For eksempel kan en container udføre hovedkomponentanalyse før du sender dataene til XGBoost-beholderen.

Implementer slutningspipelinen til et slutpunkt

Implementeringsprocessen involverer kun et par linjer kode:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Lad os konfigurere vores endepunkt til forudsigelse med en forudsigelse:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Nu hvor vi har vores endepunkt og prædiktor klar, er det tid til at bruge de testdata, vi sætter til side, og teste nøjagtigheden af ​​vores model. Vi starter med at definere en hjælpefunktion, der sender dataene en linje ad gangen til vores inferensendepunkt og får en forudsigelse til gengæld. Fordi vi har en XGBoost model, dropper vi målvariablen, før vi sender CSV-linjen til slutpunktet. Derudover fjernede vi headeren fra test-CSV'en, før vi gik gennem filen, hvilket også er et andet krav til XGBoost på SageMaker. Se følgende kode:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

Følgende skærmbillede viser vores output.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Lad os nu beregne nøjagtigheden af ​​vores model.
Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.

Se følgende kode:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Vi får en nøjagtighed på 92%. Dette er lidt lavere end de 96 %, der blev opnået under valideringstrinnet, men det er stadig højt nok. Vi forventer ikke, at nøjagtigheden er helt den samme, fordi testen udføres med et nyt datasæt.

Dataindtagelse

Vi downloadede dataene direkte og konfigurerede dem til træning. I det virkelige liv skal du muligvis sende dataene direkte fra edge-enheden ind i datasøen og få SageMaker til at indlæse dem direkte fra datasøen ind i notesbogen.

Kinesis Data Firehose er en god mulighed og den mest ligetil måde at pålideligt indlæse streamingdata i datasøer, datalagre og analyseværktøjer. Det kan fange, transformere og indlæse streamingdata til Amazon S3 og andre AWS-datalagre.

Til vores brug opretter vi en Kinesis Data Firehose-leveringsstrøm med en Lambda-transformationsfunktion for at udføre en let datarensning, når den krydser strømmen. Se følgende kode:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Denne Lambda-funktion udfører lystransformation af de data, der streames fra enhederne til datasøen. Den forventer en CSV-formateret datafil.

Til indtagelsestrinnet downloader vi dataene og simulerer en datastrøm til Kinesis Data Firehose med en Lambda-transformationsfunktion og ind i vores S3-datasø.

Lad os simulere streaming af et par linjer:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Ryd op

Det er vigtigt at slette alle de ressourcer, der bruges i denne øvelse for at minimere omkostningerne. Følgende kode sletter det SageMaker-slutpunkt, vi oprettede, samt de trænings- og testdata, vi uploadede:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Konklusion

ML-ingeniører, dataforskere og softwareudviklere kan bruge Autopilot til at bygge og implementere en inferenspipeline med lidt eller ingen erfaring med ML-programmering. Autopilot sparer tid og ressourcer ved at bruge datavidenskab og bedste praksis for ML. Store organisationer kan nu flytte ingeniørressourcer væk fra infrastrukturkonfiguration til at forbedre modeller og løse business use cases. Startups og mindre organisationer kan komme i gang med maskinlæring med ringe eller ingen ML-ekspertise.

For at komme i gang med SageMaker Autopilot, se Produkt side eller få adgang til SageMaker Autopilot i SageMaker Studio.

Vi anbefaler også at lære mere om andre vigtige funktioner SageMaker har at tilbyde, såsom Amazon SageMaker Feature Store, som integreres med Amazon SageMaker Pipelines at oprette, tilføje funktionssøgning og -opdagelse og genbruge automatiserede ML-arbejdsgange. Du kan køre flere autopilot-simuleringer med forskellige funktions- eller målvarianter i dit datasæt. Du kan også gribe dette an som et dynamisk køretøjsallokeringsproblem, hvor din model forsøger at forudsige køretøjsefterspørgsel baseret på tid (såsom tidspunkt på dagen eller ugedag) eller placering eller en kombination af begge.


Om forfatterne

Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Doug Mbaya er senior løsningsarkitekt med fokus på data og analyse. Doug arbejder tæt sammen med AWS-partnere og hjælper dem med at integrere data- og analyseløsninger i skyen. Dougs tidligere erfaring omfatter support af AWS-kunder inden for kørselsdeling og levering af mad.

Automatiser en delt klassifikationsmodel for cykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Valerio Perrone er en Applied Science Manager, der arbejder på Amazon SageMaker Automatic Model Tuning og Autopilot.

Tidsstempel:

Mere fra AWS maskinindlæring