Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot segítségével

Amazon SageMaker Autopilot lehetővé teszi a szervezetek számára, hogy gyorsan felépítsenek és telepítsenek egy végpontok közötti gépi tanulási (ML) modellt és következtetési folyamatot, mindössze néhány sornyi kóddal vagy akár minden kód nélkül egyáltalán azzal Amazon SageMaker Studio. Az Autopilot tehermentesíti az infrastruktúra konfigurálásával járó nehéz terheket és a teljes csővezeték felépítéséhez szükséges időt, beleértve a funkciók tervezését, a modellválasztást és a hiperparaméterek hangolását.

Ebben a bejegyzésben bemutatjuk, hogyan lehet a nyers adatokból egy robusztus és teljesen kiépített következtetési folyamattá válni az Autopilot segítségével.

Megoldás áttekintése

Az általunk használt A Lyft nyilvános adatkészlete a kerékpáros megosztásról hogy ez a szimuláció megjósolja, hogy egy felhasználó részt vesz-e a Bike Share for All program. Ez egy egyszerű bináris osztályozási probléma.

Szeretnénk bemutatni, milyen egyszerű egy automatizált és valós idejű következtetési folyamat felépítése, amely a felhasználókat a Bike Share for All programban való részvételük alapján osztályozza. Ebből a célból szimulálunk egy végpontok közötti adatfeldolgozási és következtetési folyamatot egy képzeletbeli bikeshare cég számára, amely a San Francisco-öböl térségében működik.

Az architektúra két részre oszlik: a beviteli folyamatra és a következtetési folyamatra.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

A bejegyzés első részében elsősorban az ML-folyamatra koncentrálunk, a második részben pedig az adatfeldolgozási folyamatot tekintjük át.

Előfeltételek

A példa követéséhez hajtsa végre a következő előfeltételeket:

  1. Hozzon létre egy új SageMaker jegyzetfüzet példányt.
  2. Létrehozása Amazon Kinesis Data Firehose szállítási folyam egy AWS Lambda transzformációs függvény. Az utasításokat lásd Amazon Kinesis Firehose adatátalakítás AWS Lambdával. Ez a lépés nem kötelező, és csak az adatfolyam szimulálásához szükséges.

Adatfeltárás

Töltsük le és vizualizáljuk az adatkészletet, amely nyilvánosan található Amazon egyszerű tárolási szolgáltatás (Amazon S3) vödör és statikus weboldal:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

A következő képernyőkép az adatok egy részhalmazát mutatja az átalakítás előtt.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

Az adatok utolsó oszlopa tartalmazza a megjósolni kívánt célt, amely egy igen vagy nem értéket felvevő bináris változó, amely jelzi, hogy a felhasználó részt vesz-e a Bike Share for All programban.

Vessünk egy pillantást a célváltozónk eloszlására bármilyen adatkiegyensúlyozatlanság esetén.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

Ahogy a fenti grafikonon látható, az adatok kiegyensúlyozatlanok, kevesebben vesznek részt a programban.

Kiegyensúlyoznunk kell az adatokat, hogy elkerüljük a túlreprezentált torzítást. Ez a lépés nem kötelező, mert az Autopilot belső megközelítést is kínál az osztálykiegyensúlyozatlanság automatikus kezelésére, amely alapértelmezés szerint az F1 pontszám érvényesítési mérőszáma. Ezenkívül, ha úgy dönt, hogy saját maga kiegyensúlyozza az adatokat, fejlettebb technikákat is használhat az osztálykiegyensúlyozatlanság kezelésére, mint pl. SMOTE or GAN.

Ebben a bejegyzésben a többségi osztályt (No) mintavételezzük adatkiegyenlítési technikaként:

A következő kód gazdagítja az adatokat, és alulmintázza a felülreprezentált osztályt:

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

Szándékosan nem kódoltuk kategorikus jellemzőinket, beleértve a bináris célértékünket is. Ennek az az oka, hogy az Autopilot gondoskodik az adatok kódolásáról és dekódolásáról az automatikus szolgáltatástervezés és a folyamatok telepítése részeként, amint azt a következő részben láthatjuk.

Az alábbi képernyőkép egy mintát mutat az adatainkból.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

A következő grafikonok adatai egyébként normálisnak tűnnek, a bimodális eloszlás a délelőtti órák és a délutáni csúcsforgalom két csúcsát jelenti, ahogy az várható volt. Hétvégén és éjszaka is megfigyeljük az alacsony aktivitást.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

A következő részben betápláljuk az adatokat az Autopilotba, hogy az egy kísérletet tudjon futtatni számunkra.

Készítsen bináris osztályozási modellt

Az Autopilot megköveteli a bemeneti és kimeneti célcsoportok megadását. A bemeneti tárolót használja az adatok betöltésére, a kimeneti tárolót pedig a műtermékek, például a funkciótervezés és a generált Jupyter notebookok mentésére. Az adatkészlet 5%-át megtartjuk, hogy kiértékeljük és érvényesítsük a modell teljesítményét a betanítás után, és az adatkészlet 95%-át feltöltjük az S3 beviteli tárolóba. Lásd a következő kódot:

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

Miután feltöltöttük az adatokat a beviteli célhelyre, ideje elindítani az Autopilotot:

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

A kísérletezéshez csak a fit() metódust kell meghívnunk. Az Autopilotnak szüksége van az S3 bemeneti és kimeneti helyére, valamint a célattribútum oszlopra, mint szükséges paraméterekre. A jellemzők feldolgozása után az Autopilot hív SageMaker automatikus modelltuning hogy megtalálja a modell legjobb verzióját számos képzési feladat futtatásával az adatkészletén. Hozzáadtuk az opcionális max_candidates paramétert, hogy a jelöltek számát 30-ra korlátozzuk, ami az Autopilot által az algoritmusok és hiperparaméterek különböző kombinációival elindított képzési feladatok száma a legjobb modell megtalálása érdekében. Ha nem adja meg ezt a paramétert, az alapértelmezett értéke 250.

Az Autopilot folyamatát a következő kóddal figyelhetjük meg:

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

A képzés befejezése némi időt vesz igénybe. Amíg fut, nézzük meg az Autopilot munkafolyamatát.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

A legjobb jelölt megtalálásához használja a következő kódot:

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

A következő képernyőkép a kimenetünket mutatja.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

Modellünk 96%-os érvényesítési pontosságot ért el, ezért telepíteni fogjuk. Hozzáadhatunk egy olyan feltételt, hogy csak akkor használjuk a modellt, ha a pontosság egy bizonyos szint felett van.

Következtetési csővezeték

Mielőtt bevezetnénk a modellünket, vizsgáljuk meg a legjobb jelöltünket és azt, hogy mi történik a folyamatban. Lásd a következő kódot:

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

Az alábbi diagram a kimenetünket mutatja.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

Az Autopilot megépítette a modellt, és három különböző konténerbe csomagolta, amelyek mindegyike egy-egy feladatot futtat egymás után: transzformáció, előrejelzés és fordított átalakítás. Ez a többlépcsős következtetés lehetséges a SageMaker következtetési folyamat.

A többlépcsős következtetés több következtetési modellt is láncolhat. Például egy konténer képes működni főkomponens analízis mielőtt átadná az adatokat az XGBoost tárolónak.

Telepítse a következtetési folyamatot egy végpontra

A telepítési folyamat mindössze néhány kódsort foglal magában:

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

Állítsuk be a végpontunkat előrejelzéshez prediktorral:

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

Most, hogy készen van a végpontunk és a prediktorunk, ideje felhasználni a félretett tesztelési adatokat, és tesztelni a modellünk pontosságát. Kezdjük egy segédfüggvény definiálásával, amely soronként elküldi az adatokat a következtetési végpontunknak, és cserébe egy előrejelzést kap. Mert van egy XGBoost modellben eldobjuk a célváltozót, mielőtt elküldjük a CSV sort a végponthoz. Ezenkívül eltávolítottuk a fejlécet a tesztelő CSV-ből, mielőtt végignéznénk a fájlt, ami szintén egy másik követelmény a SageMaker XGBoostjához. Lásd a következő kódot:

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

A következő képernyőkép a kimenetünket mutatja.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

Most számoljuk ki a modellünk pontosságát.
Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.

Lásd a következő kódot:

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

92%-os pontosságot kapunk. Ez valamivel alacsonyabb, mint az érvényesítési lépés során kapott 96%, de még mindig elég magas. Nem várjuk el, hogy a pontosság pontosan ugyanaz legyen, mert a tesztet új adatkészlettel hajtják végre.

Adatbevitel

Közvetlenül letöltöttük az adatokat és konfiguráltuk a képzéshez. A való életben előfordulhat, hogy az adatokat közvetlenül a szélső eszközről kell elküldenie a Data Lake-be, és a SageMakerrel közvetlenül az adattóból kell betöltenie azokat a notebookba.

A Kinesis Data Firehose jó lehetőség, és a legegyszerűbb módja a streaming adatok adatlakokba, adattárakba és elemzőeszközökbe történő megbízható betöltésének. Képes streaming adatokat rögzíteni, átalakítani és betölteni az Amazon S3 és más AWS adattárakba.

Használati esetünkben létrehozunk egy Kinesis Data Firehose kézbesítési adatfolyamot Lambda transzformációs funkcióval, hogy némi könnyű adattisztítást végezzünk, miközben áthalad a folyamon. Lásd a következő kódot:

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

Ez a lambda funkció az eszközökről streamelt adatok fénytranszformációját hajtja végre az adattóba. CSV formátumú adatfájlt vár.

A beviteli lépéshez letöltjük az adatokat, és szimulálunk egy adatfolyamot a Kinesis Data Firehose-ba Lambda-transzformációs funkcióval és az S3 adattóba.

Szimuláljuk a streamelést néhány sorral:

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

Tisztítsuk meg

A költségek minimalizálása érdekében fontos, hogy törölje az ebben a gyakorlatban használt összes erőforrást. A következő kód törli az általunk létrehozott SageMaker következtetési végpontot, valamint a feltöltött képzési és tesztelési adatokat:

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

Következtetés

Az ML mérnökök, adattudósok és szoftverfejlesztők az Autopilot segítségével következtetési folyamatot hozhatnak létre és telepíthetnek kevés ML programozási tapasztalattal. Az Autopilot időt és erőforrásokat takarít meg az adattudomány és az ML legjobb gyakorlatainak használatával. A nagy szervezetek mostantól áthelyezhetik a mérnöki erőforrásokat az infrastruktúra-konfigurálásról a modellek fejlesztésére és az üzleti felhasználási esetek megoldására. Az induló vállalkozások és a kisebb szervezetek csekély ML szakértelemmel kezdhetik el a gépi tanulást.

A SageMaker Autopilot használatának megkezdéséhez lásd a Termékoldal vagy elérheti a SageMaker Autopilotot a SageMaker Studio-ban.

Azt is javasoljuk, hogy többet tudjon meg a SageMaker egyéb fontos funkcióiról, mint például a Amazon SageMaker Feature Store, amely integrálódik a Amazon SageMaker csővezetékek az automatizált ML-munkafolyamatok létrehozásához, hozzáadásához és szolgáltatáskereséséhez, valamint újrafelhasználásához. Futtathat több Autopilot szimulációt különböző jellemzőkkel vagy célváltozatokkal az adatkészletében. Megközelítheti ezt dinamikus járműkiosztási problémaként is, amelyben a modell megpróbálja megjósolni a járműigényt az idő (például a napszak vagy a hét napja) vagy a hely, illetve a kettő kombinációja alapján.


A szerzőkről

Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.Doug Mbaya Senior Solution építész, aki az adatokra és az elemzésekre összpontosít. A Doug szorosan együttműködik az AWS-partnerekkel, segítve őket az adatok és az elemzési megoldások felhőbe való integrálásában. Doug korábbi tapasztalatai közé tartozik az AWS-ügyfelek támogatása a fuvarmegosztási és ételszállítási szegmensben.

Automatizálja a megosztott kerékpárok és robogók osztályozási modelljét az Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence segítségével. Függőleges keresés. Ai.Valerio Perrone Alkalmazott tudományos menedzser, aki az Amazon SageMaker Automatic Model Tuning és Autopilot területén dolgozik.

Időbélyeg:

Még több AWS gépi tanulás