Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot

Amazon SageMaker autopilot gör det möjligt för organisationer att snabbt bygga och distribuera en end-to-end maskininlärningsmodell (ML) och slutledningspipeline med bara några rader kod eller till och med utan någon kod alls med Amazon SageMaker Studio. Autopilot avlastar det tunga lyftet av att konfigurera infrastruktur och den tid det tar att bygga en hel pipeline, inklusive funktionsteknik, modellval och hyperparameterjustering.

I det här inlägget visar vi hur man går från rådata till en robust och fullt utplacerad slutledningspipeline med Autopilot.

Lösningsöversikt

Vi använder Lyfts offentliga dataset om cykeldelning för denna simulering att förutsäga om en användare deltar i eller inte Bike Share for All-programmet. Detta är ett enkelt binärt klassificeringsproblem.

Vi vill visa upp hur enkelt det är att bygga en automatiserad och realtidsinferenspipeline för att klassificera användare baserat på deras deltagande i Bike Share for All-programmet. För detta ändamål simulerar vi en end-to-end-dataintag och slutledningspipeline för ett imaginärt bikeshare-företag som är verksamt i San Francisco Bay Area.

Arkitekturen är uppdelad i två delar: intagspipeline och inferenspipeline.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Vi fokuserar i första hand på ML-pipeline i det första avsnittet av det här inlägget, och granskar dataintagspipeline i den andra delen.

Förutsättningar

För att följa detta exempel, fyll i följande förutsättningar:

  1. Skapa en ny SageMaker anteckningsbok-instans.
  2. Skapa ett Amazon Kinesis Data Firehose leveransström med en AWS Lambda omvandla funktion. För instruktioner, se Amazon Kinesis Firehose Data Transformation med AWS Lambda. Detta steg är valfritt och behövs endast för att simulera dataströmning.

Datautforskning

Låt oss ladda ner och visualisera datasetet, som är placerat i en offentlig Amazon enkel lagringstjänst (Amazon S3) hink och statisk webbplats:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

Följande skärmdump visar en delmängd av data före transformation.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Den sista kolumnen av data innehåller målet vi vill förutsäga, vilket är en binär variabel som tar antingen ett Ja eller Nej-värde, vilket indikerar om användaren deltar i Bike Share for All-programmet.

Låt oss ta en titt på fördelningen av vår målvariabel för eventuell dataobalans.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Som visas i diagrammet ovan är uppgifterna obalanserade, med färre personer som deltar i programmet.

Vi måste balansera uppgifterna för att förhindra en överrepresentationsbias. Det här steget är valfritt eftersom Autopilot också erbjuder ett internt tillvägagångssätt för att hantera klassobalans automatiskt, vilket som standard är ett F1-poängvalideringsmått. Om du dessutom väljer att balansera data själv kan du använda mer avancerade tekniker för att hantera klassobalans, som t.ex. slog or GAN.

För det här inlägget nedsamplar vi majoritetsklassen (Nej) som en databalanseringsteknik:

Följande kod berikar data och undersamplar den överrepresenterade klassen:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Vi lämnade medvetet våra kategoriska funktioner inte kodade, inklusive vårt binära målvärde. Detta beror på att Autopilot tar hand om att koda och avkoda data åt oss som en del av den automatiska funktionsutvecklingen och pipeline-distributionen, som vi ser i nästa avsnitt.

Följande skärmdump visar ett exempel på vår data.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Data i följande grafer ser annars normala ut, med en bimodal fördelning som representerar de två topparna för morgontimmarna och eftermiddagsrusningen, som du kan förvänta dig. Vi observerar även låga aktiviteter på helger och nattetid.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

I nästa avsnitt matar vi data till Autopilot så att den kan köra ett experiment åt oss.

Bygg en binär klassificeringsmodell

Autopilot kräver att vi anger ingångs- och utmatningsdestinationshinkarna. Den använder input-bucket för att ladda data och output-bucket för att spara artefakterna, såsom funktionsteknik och de genererade Jupyter-anteckningsböckerna. Vi behåller 5 % av datamängden för att utvärdera och validera modellens prestanda efter att utbildningen är klar och laddar upp 95 % av datasetet till S3-inmatningshinken. Se följande kod:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

När vi laddat upp data till inmatningsdestinationen är det dags att starta autopiloten:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

Allt vi behöver för att börja experimentera är att anropa metoden fit(). Autopiloten behöver in- och utdata S3-platsen och målattributkolumnen som nödvändiga parametrar. Efter funktionsbearbetning ringer autopiloten SageMaker automatisk modelljustering för att hitta den bästa versionen av en modell genom att köra många träningsjobb på din datauppsättning. Vi lade till den valfria parametern max_candidates för att begränsa antalet kandidater till 30, vilket är antalet utbildningsjobb som Autopilot lanserar med olika kombinationer av algoritmer och hyperparametrar för att hitta den bästa modellen. Om du inte anger den här parametern är den som standard 250.

Vi kan observera utvecklingen av autopilot med följande kod:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

Utbildningen tar lite tid att genomföra. Medan den körs, låt oss titta på autopilotens arbetsflöde.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

För att hitta den bästa kandidaten, använd följande kod:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

Följande skärmdump visar vår produktion.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Vår modell uppnådde en valideringsnoggrannhet på 96 %, så vi kommer att distribuera den. Vi skulle kunna lägga till ett villkor så att vi bara använder modellen om noggrannheten är över en viss nivå.

Inferenspipeline

Innan vi implementerar vår modell, låt oss undersöka vår bästa kandidat och vad som händer i vår pipeline. Se följande kod:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

Följande diagram visar vår produktion.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Autopiloten har byggt modellen och har paketerat den i tre olika behållare, var och en sekventiellt kör en specifik uppgift: transformera, förutsäga och omvänd transformation. Denna flerstegs slutledning är möjlig med en SageMaker inferenspipeline.

En slutledning i flera steg kan också kedja flera slutledningsmodeller. Till exempel kan en behållare utföra huvudkomponentanalys innan du skickar data till XGBoost-behållaren.

Distribuera slutledningsledningen till en slutpunkt

Implementeringsprocessen involverar bara några rader kod:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Låt oss konfigurera vår slutpunkt för förutsägelse med en prediktor:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Nu när vi har vår slutpunkt och prediktor redo, är det dags att använda testdata som vi lägger åt sidan och testa vår modells noggrannhet. Vi börjar med att definiera en hjälpfunktion som skickar data en rad i taget till vår slutpunkt för slutledning och får en förutsägelse i gengäld. Eftersom vi har en XGBoost modell, släpper vi målvariabeln innan vi skickar CSV-raden till slutpunkten. Dessutom tog vi bort rubriken från test-CSV-filen innan vi gick igenom filen, vilket också är ett annat krav för XGBoost på SageMaker. Se följande kod:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

Följande skärmdump visar vår produktion.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Låt oss nu beräkna noggrannheten hos vår modell.
Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.

Se följande kod:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

Vi får en noggrannhet på 92%. Detta är något lägre än de 96 % som erhölls under valideringssteget, men det är fortfarande tillräckligt högt. Vi förväntar oss inte att noggrannheten är exakt densamma eftersom testet utförs med en ny datamängd.

Upptagning av data

Vi laddade ner data direkt och konfigurerade den för träning. I verkligheten kan du behöva skicka data direkt från edge-enheten till datasjön och låta SageMaker ladda den direkt från datasjön till den bärbara datorn.

Kinesis Data Firehose är ett bra alternativ och det enklaste sättet att på ett tillförlitligt sätt ladda strömmande data till datasjöar, datalager och analysverktyg. Det kan fånga, omvandla och ladda strömmande data till Amazon S3 och andra AWS-databutiker.

För vårt användningsfall skapar vi en Kinesis Data Firehose-leveransström med en Lambda-transformationsfunktion för att göra lite lätt datarensning när den korsar strömmen. Se följande kod:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Denna Lambda-funktion utför ljustransformation av data som strömmas från enheterna till datasjön. Den förväntar sig en CSV-formaterad datafil.

För intagssteget laddar vi ner data och simulerar en dataström till Kinesis Data Firehose med en Lambda-transformeringsfunktion och in i vår S3-datasjö.

Låt oss simulera streaming några rader:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Städa upp

Det är viktigt att ta bort alla resurser som används i den här övningen för att minimera kostnaderna. Följande kod tar bort SageMaker slutpunkten som vi skapade samt tränings- och testdata som vi laddade upp:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Slutsats

ML-ingenjörer, datavetare och mjukvaruutvecklare kan använda Autopilot för att bygga och distribuera en inferenspipeline med liten eller ingen erfarenhet av ML-programmering. Autopilot sparar tid och resurser genom att använda datavetenskap och bästa metoder för ML. Stora organisationer kan nu flytta tekniska resurser bort från infrastrukturkonfiguration till att förbättra modeller och lösa affärsanvändningsfall. Nystartade företag och mindre organisationer kan komma igång med maskininlärning med liten eller ingen ML-expertis.

För att komma igång med SageMaker Autopilot, se Produktsida eller gå till SageMaker Autopilot i SageMaker Studio.

Vi rekommenderar också att du lär dig mer om andra viktiga funktioner SageMaker har att erbjuda, till exempel Amazon SageMaker Feature Store, som integreras med Amazon SageMaker-rörledningar för att skapa, lägga till funktionssökning och upptäckt och återanvända automatiserade ML-arbetsflöden. Du kan köra flera autopilotsimuleringar med olika funktions- eller målvarianter i din datauppsättning. Du kan också närma dig detta som ett dynamiskt fordonsallokeringsproblem där din modell försöker förutsäga fordonsefterfrågan baserat på tid (som tid på dygnet eller veckodag) eller plats, eller en kombination av båda.


Om författarna

Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Doug Mbaya är en senior lösningsarkitekt med fokus på data och analys. Doug arbetar nära med AWS-partners och hjälper dem att integrera data- och analyslösningar i molnet. Dougs tidigare erfarenhet inkluderar att stötta AWS-kunder inom segmentet åkdelning och matleverans.

Automatisera en delad klassificeringsmodell för cyklar och skotrar med Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Valerio Perrone är en Applied Science Manager som arbetar med Amazon SageMaker Automatic Model Tuning och Autopilot.

Tidsstämpel:

Mer från AWS maskininlärning