Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot

Amazon SageMaker Autopilot gjør det mulig for organisasjoner å raskt bygge og distribuere en ende-til-ende maskinlæringsmodell (ML) og slutningspipeline med bare noen få linjer med kode eller til og med uten noen kode i det hele tatt med Amazon SageMaker Studio. Autopilot avlaster det tunge løftet med å konfigurere infrastruktur og tiden det tar å bygge en hel pipeline, inkludert funksjonsteknikk, modellvalg og hyperparameterinnstilling.

I dette innlegget viser vi hvordan du går fra rådata til en robust og fullt utrullet inferenspipeline med autopilot.

Løsningsoversikt

Vi bruker Lyfts offentlige datasett om sykkeldeling for denne simuleringen for å forutsi hvorvidt en bruker deltar i Bike Share for All-programmet. Dette er et enkelt binært klassifiseringsproblem.

Vi ønsker å vise frem hvor enkelt det er å bygge en automatisert og sanntids slutningspipeline for å klassifisere brukere basert på deres deltakelse i Bike Share for All-programmet. For dette formål simulerer vi en ende-til-ende datainntak og slutningspipeline for et tenkt sykkelselskap som opererer i San Francisco Bay Area.

Arkitekturen er delt ned i to deler: inntaksrørledningen og inferensrørledningen.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Vi fokuserer først og fremst på ML-pipelinen i den første delen av dette innlegget, og gjennomgår datainntakspipelinen i den andre delen.

Forutsetninger

For å følge dette eksemplet, fullfør følgende forutsetninger:

  1. Opprett en ny SageMaker notatbokforekomst.
  2. Lag en Amazon Kinesis Data Firehose leveringsstrøm med en AWS Lambda transformere funksjon. For instruksjoner, se Amazon Kinesis Firehose Data Transformation med AWS Lambda. Dette trinnet er valgfritt og bare nødvendig for å simulere datastrømming.

Data leting

La oss laste ned og visualisere datasettet, som er plassert i en offentlig Amazon enkel lagringstjeneste (Amazon S3) bøtte og statisk nettsted:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

Følgende skjermbilde viser en delmengde av dataene før transformasjon.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Den siste kolonnen av dataene inneholder målet vi ønsker å forutsi, som er en binær variabel som tar enten en Ja eller Nei verdi, som indikerer om brukeren deltar i Bike Share for All-programmet.

La oss ta en titt på fordelingen av målvariabelen vår for enhver dataubalanse.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Som vist i grafen ovenfor, er dataene ubalanserte, med færre personer som deltar i programmet.

Vi må balansere dataene for å forhindre en overrepresentasjonsskjevhet. Dette trinnet er valgfritt fordi autopilot også tilbyr en intern tilnærming for å håndtere klasseubalanse automatisk, som standard er en F1-scorevalideringsberegning. I tillegg, hvis du velger å balansere dataene selv, kan du bruke mer avanserte teknikker for å håndtere klasseubalanse, som f.eks. SMOTE or GAN.

For dette innlegget nedsampler vi majoritetsklassen (Nei) som en databalanseringsteknikk:

Følgende kode beriker dataene og undersampler den overrepresenterte klassen:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Vi forlot bevisst våre kategoriske funksjoner ikke kodet, inkludert vår binære målverdi. Dette er fordi Autopilot tar seg av koding og dekoding av dataene for oss som en del av den automatiske funksjonsutviklingen og pipeline-distribusjonen, som vi ser i neste avsnitt.

Følgende skjermbilde viser et eksempel på dataene våre.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Dataene i de følgende grafene ser ellers normale ut, med en bimodal fordeling som representerer de to toppene for morgentimene og ettermiddagsrushet, som du forventer. Vi observerer også lave aktiviteter i helgene og om natten.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

I neste avsnitt mater vi dataene til autopiloten slik at den kan kjøre et eksperiment for oss.

Bygg en binær klassifiseringsmodell

Autopilot krever at vi spesifiserer inngangs- og utgangsdestinasjonsbøttene. Den bruker inndatabøtten til å laste inn dataene og utdatabøtten for å lagre artefaktene, for eksempel funksjonsteknikk og de genererte Jupyter-notatbøkene. Vi beholder 5 % av datasettet for å evaluere og validere modellens ytelse etter at opplæringen er fullført og laster opp 95 % av datasettet til S3-inndatabøtten. Se følgende kode:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Etter at vi har lastet opp dataene til inndatadestinasjonen, er det på tide å starte autopiloten:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Alt vi trenger for å begynne å eksperimentere er å kalle fit()-metoden. Autopiloten trenger inn- og utdata S3-plasseringen og målattributtkolonnen som de nødvendige parameterne. Etter funksjonsbehandling ringer autopiloten SageMaker automatisk modellinnstilling for å finne den beste versjonen av en modell ved å kjøre mange opplæringsjobber på datasettet ditt. Vi la til den valgfrie max_candidates-parameteren for å begrense antall kandidater til 30, som er antallet treningsjobber som Autopilot lanserer med ulike kombinasjoner av algoritmer og hyperparametre for å finne den beste modellen. Hvis du ikke spesifiserer denne parameteren, er den standard til 250.

Vi kan observere fremdriften til autopilot med følgende kode:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

Opplæringen tar litt tid å gjennomføre. Mens den kjører, la oss se på autopilotens arbeidsflyt.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

For å finne den beste kandidaten, bruk følgende kode:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

Følgende skjermbilde viser produksjonen vår.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Modellen vår oppnådde en valideringsnøyaktighet på 96 %, så vi kommer til å distribuere den. Vi kan legge til en betingelse slik at vi bare bruker modellen hvis nøyaktigheten er over et visst nivå.

Inferensrørledning

Før vi implementerer modellen vår, la oss undersøke vår beste kandidat og hva som skjer i vår pipeline. Se følgende kode:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

Følgende diagram viser produksjonen vår.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Autopilot har bygget modellen og har pakket den i tre forskjellige beholdere, som hver kjører en spesifikk oppgave sekvensielt: transformere, forutsi og omvendt transformasjon. Denne flertrinnsslutningen er mulig med en SageMaker inferensrørledning.

En flertrinns slutning kan også kjede flere slutningsmodeller. For eksempel kan en beholder utføre hovedkomponentanalyse før du sender dataene til XGBoost-beholderen.

Distribuer inferensrørledningen til et endepunkt

Implementeringsprosessen involverer bare noen få linjer med kode:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

La oss konfigurere endepunktet vårt for prediksjon med en prediktor:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Nå som vi har endepunktet og prediktoren klar, er det på tide å bruke testdataene vi har satt til side og teste nøyaktigheten til modellen vår. Vi starter med å definere en verktøyfunksjon som sender dataene en linje om gangen til vårt inferensendepunkt og får en prediksjon i retur. Fordi vi har en Xgboost modell, dropper vi målvariabelen før vi sender CSV-linjen til endepunktet. I tillegg fjernet vi overskriften fra test-CSV-en før vi gikk gjennom filen, som også er et annet krav for XGBoost på SageMaker. Se følgende kode:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

Følgende skjermbilde viser produksjonen vår.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

La oss nå beregne nøyaktigheten til modellen vår.
Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.

Se følgende kode:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Vi får en nøyaktighet på 92 %. Dette er litt lavere enn de 96 % som ble oppnådd under valideringstrinnet, men det er fortsatt høyt nok. Vi forventer ikke at nøyaktigheten skal være nøyaktig den samme fordi testen utføres med et nytt datasett.

Inntak av data

Vi lastet ned dataene direkte og konfigurerte dem for trening. I det virkelige liv må du kanskje sende dataene direkte fra edge-enheten inn i datasjøen og la SageMaker laste den direkte fra datasjøen inn i den bærbare datamaskinen.

Kinesis Data Firehose er et godt alternativ og den enkleste måten å laste strømmedata pålitelig inn i datainnsjøer, datalagre og analyseverktøy. Den kan fange opp, transformere og laste strømmedata til Amazon S3 og andre AWS-datalagre.

For vårt bruk oppretter vi en Kinesis Data Firehose-leveringsstrøm med en Lambda-transformasjonsfunksjon for å gjøre noe lett datarensing når den krysser strømmen. Se følgende kode:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Denne Lambda-funksjonen utfører lystransformasjon av data som strømmes fra enhetene til datasjøen. Den forventer en CSV-formatert datafil.

For inntakstrinnet laster vi ned dataene og simulerer en datastrøm til Kinesis Data Firehose med en Lambda-transformasjonsfunksjon og inn i vår S3-datainnsjø.

La oss simulere strømming av noen få linjer:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Rydd opp

Det er viktig å slette alle ressursene som brukes i denne øvelsen for å minimere kostnadene. Følgende kode sletter SageMaker-inferensendepunktet vi opprettet, samt trenings- og testdataene vi lastet opp:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

konklusjonen

ML-ingeniører, dataforskere og programvareutviklere kan bruke Autopilot til å bygge og distribuere en inferenspipeline med liten eller ingen erfaring med ML-programmering. Autopilot sparer tid og ressurser ved å bruke datavitenskap og beste praksis for ML. Store organisasjoner kan nå flytte ingeniørressurser bort fra infrastrukturkonfigurasjon til å forbedre modeller og løse forretningsbruk. Startups og mindre organisasjoner kan komme i gang med maskinlæring med liten eller ingen ML-ekspertise.

For å komme i gang med SageMaker Autopilot, se produktside eller få tilgang til SageMaker Autopilot i SageMaker Studio.

Vi anbefaler også å lære mer om andre viktige funksjoner SageMaker har å tilby, for eksempel Amazon SageMaker Feature Store, som integreres med Amazon SageMaker-rørledninger å opprette, legge til funksjonssøk og oppdagelse og gjenbruke automatiserte ML-arbeidsflyter. Du kan kjøre flere autopilotsimuleringer med forskjellige funksjoner eller målvarianter i datasettet ditt. Du kan også nærme deg dette som et dynamisk kjøretøyallokeringsproblem der modellen din prøver å forutsi kjøretøyets etterspørsel basert på tid (som tid på dagen eller ukedagen) eller plassering, eller en kombinasjon av begge.


Om forfatterne

Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Doug Mbaya er en senior løsningsarkitekt med fokus på data og analyse. Doug jobber tett med AWS-partnere, og hjelper dem med å integrere data- og analyseløsninger i skyen. Dougs tidligere erfaring inkluderer å støtte AWS-kunder innen kjøredeling og matleveringssegmentet.

Automatiser en delt klassifiseringsmodell for sykler og scootere med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikalt søk. Ai.Valerio Perrone er en Applied Science Manager som jobber med Amazon SageMaker Automatic Model Tuning og Autopilot.

Tidstempel:

Mer fra AWS maskinlæring