Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

Amazon SageMaker Autopilot으로 공유 자전거 및 스쿠터 분류 모델 자동화

Amazon SageMaker 자동 조종 장치 조직이 단 몇 줄의 코드 또는 코드 없이 전혀 아마존 세이지 메이커 스튜디오. Autopilot은 기능 엔지니어링, 모델 선택, 초매개변수 조정을 포함하여 전체 파이프라인을 구축하는 데 걸리는 시간과 인프라 구성의 부담을 덜어줍니다.

이 게시물에서는 Autopilot을 사용하여 원시 데이터에서 강력하고 완전히 배포된 추론 파이프라인으로 이동하는 방법을 보여줍니다.

솔루션 개요

우리는 사용 자전거 공유에 대한 Lyft의 공개 데이터 세트 이 시뮬레이션에서 사용자가 참여 여부를 예측하려면 모든 프로그램을 위한 자전거 공유. 이것은 간단한 이진 분류 문제입니다.

우리는 모두를 위한 자전거 공유 프로그램에 대한 참여를 기반으로 사용자를 분류하는 자동화된 실시간 추론 파이프라인을 구축하는 것이 얼마나 쉬운지 보여주고 싶습니다. 이를 위해 샌프란시스코 베이 지역에서 운영되는 가상의 자전거 공유 회사에 대한 종단 간 데이터 수집 및 추론 파이프라인을 시뮬레이션합니다.

아키텍처는 수집 파이프라인과 추론 파이프라인의 두 부분으로 나뉩니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

이 게시물의 첫 번째 섹션에서는 주로 ML 파이프라인에 초점을 맞추고 두 번째 부분에서는 데이터 수집 파이프라인을 검토합니다.

사전 조건

이 예를 따르려면 다음 전제 조건을 완료하십시오.

  1. 새 SageMaker 노트북 인스턴스 생성.
  2. 를 생성 아마존 키네 시스 데이터 파이어 호스 전송 스트림 AWS 람다 변환 기능. 지침은 다음을 참조하십시오. AWS Lambda를 사용한 Amazon Kinesis Firehose 데이터 변환. 이 단계는 선택 사항이며 데이터 스트리밍을 시뮬레이션하는 데만 필요합니다.

데이터 탐색

공개된 데이터세트를 다운로드하여 시각화해 보겠습니다. 아마존 단순 스토리지 서비스 (Amazon S3) 버킷 및 정적 웹 사이트:

# The dataset is located in a public bucket and static s3 website.
# https://www.lyft.com/bikes/bay-wheels/system-data import pandas as pd
import numpy as np
import os
from time import sleep !wget -q -O '201907-baywheels-tripdata.zip' https://s3.amazonaws.com/baywheels-data/201907-baywheels-tripdata.csv.zip
!unzip -q -o 201907-baywheels-tripdata.zip
csv_file = os.listdir('.')
data = pd.read_csv('201907-baywheels-tripdata.csv', low_memory=False)
data.head()

다음 스크린샷은 변환 전 데이터의 하위 집합을 보여줍니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

데이터의 마지막 열에는 예측하려는 대상이 포함되어 있습니다. 이 변수는 Yes 또는 No 값을 취하는 이진 변수로, 사용자가 Bike Share for All 프로그램에 참여하는지 여부를 나타냅니다.

데이터 불균형에 대한 대상 변수의 분포를 살펴보겠습니다.

# For plotting
%matplotlib inline
import matplotlib.pyplot as plt
#!pip install seaborn # If you need this library
import seaborn as sns
display(sns.countplot(x='bike_share_for_all_trip', data=data))

Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

위의 그래프에서 볼 수 있듯이 프로그램에 참여하는 사람이 적어 데이터가 불균형합니다.

과잉 표현 편향을 방지하기 위해 데이터의 균형을 맞출 필요가 있습니다. Autopilot은 클래스 불균형을 자동으로 처리하는 내부 접근 방식도 제공하기 때문에 이 단계는 선택 사항입니다. 또한 데이터 균형을 직접 선택하는 경우 클래스 불균형을 처리하기 위해 다음과 같은 고급 기술을 사용할 수 있습니다. 스 모트 or GAN.

이 게시물에서는 데이터 밸런싱 기술로 다수 클래스(No)를 다운샘플링합니다.

다음 코드는 데이터를 풍부하게 하고 과도하게 표현된 클래스를 과소 샘플링합니다.

df = data.copy()
df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time'])
df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown
df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month)
# Breaking the day in 4 parts: ['morning', 'afternoon', 'evening']
conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21),
]
choices = ['morning', 'afternoon', 'evening']
df['part_of_day'] = np.select(conditions, choices, default='night')
df.dropna(inplace=True) # Downsampling the majority to rebalance the data
# We are getting about an even distribution
df.sort_values(by='bike_share_for_all_trip', inplace=True)
slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1)
df = df[-slice_pointe:]
# The data is balanced now. Let's reshuffle the data
df = df.sample(frac=1).reset_index(drop=True)

바이너리 대상 값을 포함하여 의도적으로 범주형 기능을 인코딩하지 않은 상태로 두었습니다. 다음 섹션에서 볼 수 있듯이 Autopilot이 자동 기능 엔지니어링 및 파이프라인 배포의 일부로 데이터 인코딩 및 디코딩을 처리하기 때문입니다.

다음 스크린샷은 데이터 샘플을 보여줍니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

다음 그래프의 데이터는 예상대로 아침 시간과 오후 러시아워에 대한 두 피크를 나타내는 이중 모드 분포를 사용하여 정상으로 보입니다. 우리는 또한 주말과 밤에 낮은 활동을 관찰합니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

다음 섹션에서는 실험을 실행할 수 있도록 Autopilot에 데이터를 제공합니다.

이진 분류 모델 구축

Autopilot을 사용하려면 입력 및 출력 대상 버킷을 지정해야 합니다. 입력 버킷을 사용하여 데이터를 로드하고 출력 버킷을 사용하여 기능 엔지니어링 및 생성된 Jupyter 노트북과 같은 아티팩트를 저장합니다. 훈련이 완료된 후 모델의 성능을 평가 및 검증하기 위해 데이터 세트의 5%를 유지하고 데이터 세트의 95%를 S3 입력 버킷에 업로드합니다. 다음 코드를 참조하십시오.

import sagemaker
import boto3 # Let's define our storage.
# We will use the default sagemaker bucket and will enforce encryption bucket = sagemaker.Session().default_bucket() # SageMaker default bucket. #Encrypting the bucket
s3 = boto3.client('s3')
SSEConfig={ 'Rules': [ { 'ApplyServerSideEncryptionByDefault': { 'SSEAlgorithm': 'AES256', } }, ] }
s3.put_bucket_encryption(Bucket=bucket, ServerSideEncryptionConfiguration=SSEConfig) prefix = 'sagemaker-automl01' # prefix for ther bucket
role = sagemaker.get_execution_role() # IAM role object to use by SageMaker
sagemaker_session = sagemaker.Session() # Sagemaker API
region = sagemaker_session.boto_region_name # AWS Region # Where we will load our data input_path = "s3://{}/{}/automl_bike_train_share-1".format(bucket, prefix) output_path = "s3://{}/{}/automl_bike_output_share-1".format(bucket, prefix) # Spliting data in train/test set.
# We will use 95% of the data for training and the remainder for testing.
slice_point = int(df.shape[0] * 0.95) training_set = df[:slice_point] # 95%
testing_set = df[slice_point:] # 5% # Just making sure we have split it correctly
assert training_set.shape[0] + testing_set.shape[0] == df.shape[0] # Let's save the data locally and upload it to our s3 data location
training_set.to_csv('bike_train.csv')
testing_set.to_csv('bike_test.csv', header=False) # Uploading file the trasining set to the input bucket
sagemaker.s3.S3Uploader.upload(local_path='bike_train.csv', desired_s3_uri=input_path)

입력 대상에 데이터를 업로드한 후 Autopilot을 시작할 차례입니다.

from sagemaker.automl.automl import AutoML
# You give your job a name and provide the s3 path where you uploaded the data
bike_automl_binary = AutoML(role=role, target_attribute_name='bike_share_for_all_trip', output_path=output_path, max_candidates=30)
# Starting the training bike_automl_binary.fit(inputs=input_path, wait=False, logs=False)

실험을 시작하기 위해 필요한 것은 fit() 메서드를 호출하는 것입니다. Autopilot에는 입력 및 출력 S3 위치와 대상 속성 열이 필수 매개변수로 필요합니다. 기능 처리 후 Autopilot 호출 SageMaker 자동 모델 튜닝 데이터 세트에서 많은 훈련 작업을 실행하여 최상의 모델 버전을 찾습니다. 후보 수를 30개로 제한하기 위해 선택적 max_candidates 매개변수를 추가했습니다. 이는 Autopilot이 최상의 모델을 찾기 위해 알고리즘과 하이퍼파라미터의 다양한 조합으로 시작하는 훈련 작업 수입니다. 이 매개변수를 지정하지 않으면 기본값은 250입니다.

다음 코드를 사용하여 Autopilot의 진행 상황을 관찰할 수 있습니다.

# Let's monitor the progress this will take a while. Go grup some coffe.
from time import sleep def check_job_status(): return bike_automl_binary.describe_auto_ml_job()['AutoMLJobStatus'] def discribe(): return bike_automl_binary.describe_auto_ml_job() while True: print (check_job_status(), discribe()['AutoMLJobSecondaryStatus'], end='** ') if check_job_status() in ["Completed", "Failed"]: if "Failed" in check_job_status(): print(discribe()['FailureReason']) break sleep(20)

교육을 완료하는 데 약간의 시간이 걸립니다. 실행되는 동안 Autopilot 워크플로를 살펴보겠습니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

최적의 후보를 찾으려면 다음 코드를 사용하십시오.

# Let's take a look at the best candidate selected by AutoPilot
from IPython.display import JSON
def jsonView(obj, rootName=None): return JSON(obj, root=rootName, expanded=True) bestCandidate = bike_automl_binary.describe_auto_ml_job()['BestCandidate']
display(jsonView(bestCandidate['FinalAutoMLJobObjectiveMetric'], 'FinalAutoMLJobObjectiveMetric'))

다음 스크린 샷은 출력을 보여줍니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

우리 모델은 96%의 유효성 검사 정확도를 달성했으므로 배포할 것입니다. 정확도가 특정 수준 이상인 경우에만 모델을 사용하도록 조건을 추가할 수 있습니다.

추론 파이프라인

모델을 배포하기 전에 최상의 후보와 파이프라인에서 어떤 일이 일어나고 있는지 살펴보겠습니다. 다음 코드를 참조하십시오.

display(jsonView(bestCandidate['InferenceContainers'], 'InferenceContainers'))

다음 다이어그램은 출력을 보여줍니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

Autopilot은 모델을 빌드하고 각각 변환, 예측 및 역변환이라는 특정 작업을 순차적으로 실행하는 세 가지 다른 컨테이너에 패키지했습니다. 이 다단계 추론은 다음과 같이 가능합니다. SageMaker 추론 파이프라인.

다단계 추론은 여러 추론 모델을 연결할 수도 있습니다. 예를 들어 하나의 컨테이너는 다음을 수행할 수 있습니다. 주요 구성 요소 분석 XGBoost 컨테이너에 데이터를 전달하기 전에.

엔드포인트에 추론 파이프라인 배포

배포 프로세스에는 몇 줄의 코드만 포함됩니다.

# We chose to difine an endpoint name.
from datetime import datetime as dt
today = str(dt.today())[:10]
endpoint_name='binary-bike-share-' + today
endpoint = bike_automl_binary.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge', endpoint_name=endpoint_name, candidate=bestCandidate, wait=True)

예측자를 사용하여 예측을 위해 끝점을 구성해 보겠습니다.

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer
csv_serializer = CSVSerializer()
csv_deserializer = CSVDeserializer()
# Initialize the predictor
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker.Session(), serializer=csv_serializer, deserializer=csv_deserializer )

이제 끝점과 예측기가 준비되었으므로 따로 설정한 테스트 데이터를 사용하고 모델의 정확도를 테스트할 차례입니다. 한 번에 한 줄씩 데이터를 추론 끝점으로 보내고 그 대가로 예측을 가져오는 유틸리티 함수를 정의하는 것으로 시작합니다. 왜냐하면 우리는 XGBoost 모델에서 CSV 라인을 엔드포인트로 보내기 전에 대상 변수를 삭제합니다. 또한 파일을 반복하기 전에 테스트 CSV에서 헤더를 제거했으며 이는 SageMaker의 XGBoost에 대한 또 다른 요구 사항이기도 합니다. 다음 코드를 참조하십시오.

# The fuction takes 3 arguments: the file containing the test set,
# The predictor and finaly the number of lines to send for prediction.
# The function returns two Series: inferred and Actual.
def get_inference(file, predictor, n=1): infered = [] actual = [] with open(file, 'r') as csv: for i in range(n): line = csv.readline().split(',') #print(line) try: # Here we remove the target variable from the csv line before predicting observed = line.pop(14).strip('n') actual.append(observed) except: pass obj = ','.join(line) predicted = predictor.predict(obj)[0][0] infered.append(predicted) pd.Series(infered) data = {'Infered': pd.Series(infered), 'Observed': pd.Series(actual)} return pd.DataFrame(data=data) n = testing_set.shape[0] # The size of the testing data
inference_df = get_inference('bike_test.csv', predictor, n) inference_df['Binary_Result'] = (inference_df['Observed'] == inference_df['Infered'])
display(inference_df.head())

다음 스크린 샷은 출력을 보여줍니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

이제 모델의 정확도를 계산해 보겠습니다.
Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.

다음 코드를 참조하십시오.

count_binary = inference_df['Binary_Result'].value_counts()
accuracy = count_binary[True]/n
print('Accuracy:', accuracy)

92%의 정확도를 얻습니다. 이는 검증 단계에서 얻은 96%보다 약간 낮지만 여전히 충분히 높습니다. 테스트가 새 데이터 세트로 수행되기 때문에 정확도가 정확히 동일할 것으로 기대하지 않습니다.

데이터 수집

데이터를 직접 다운로드하여 교육용으로 구성했습니다. 실제로는 데이터를 에지 장치에서 데이터 레이크로 직접 보내고 SageMaker가 데이터 레이크에서 노트북으로 직접 로드하도록 해야 할 수 있습니다.

Kinesis Data Firehose는 스트리밍 데이터를 데이터 레이크, 데이터 저장소 및 분석 도구에 안정적으로 로드하는 좋은 옵션이자 가장 간단한 방법입니다. 스트리밍 데이터를 캡처, 변환 및 Amazon S3 및 기타 AWS 데이터 스토어로 로드할 수 있습니다.

우리의 사용 사례에서는 스트림을 통과할 때 간단한 데이터 정리를 수행하기 위해 Lambda 변환 기능이 있는 Kinesis Data Firehose 전송 스트림을 생성합니다. 다음 코드를 참조하십시오.

# Data processing libraries
import pandas as pd # Data processing
import numpy as np
import base64
from io import StringIO def lambda_handler(event, context): output = [] print('Received', len(event['records']), 'Records') for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') df = pd.read_csv(StringIO(payload), index_col=0) df.drop(columns=['rental_access_method'], inplace=True) df['start_time'] = pd.to_datetime(df['start_time']) df['start_time'] = pd.to_datetime(df['end_time']) # Adding some day breakdown df = df.assign(day_of_week=df.start_time.dt.dayofweek, hour_of_day=df.start_time.dt.hour, trip_month=df.start_time.dt.month) # Breaking the day in 4 parts: ['morning', 'afternoon', 'evening'] conditions = [ (df['hour_of_day'] >= 5) & (df['hour_of_day'] < 12), (df['hour_of_day'] >= 12) & (df['hour_of_day'] < 18), (df['hour_of_day'] >= 18) & (df['hour_of_day'] < 21), ] choices = ['morning', 'afternoon', 'evening'] df['part_of_day'] = np.select(conditions, choices, default='night') df.dropna(inplace=True) # Downsampling the majority to rebalance the data # We are getting about an even distribution df.sort_values(by='bike_share_for_all_trip', inplace=True) slice_pointe = int(df['bike_share_for_all_trip'].value_counts()['Yes'] * 2.1) df = df[-slice_pointe:] # The data is balanced now. Let's reshuffle the data df = df.sample(frac=1).reset_index(drop=True) data = base64.b64encode(bytes(df.to_csv(), 'utf-8')).decode("utf-8") output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': data } output.append(output_record) print('Returned', len(output), 'Records') print('Event', event) return {'records': output}

이 Lambda 함수는 디바이스에서 스트리밍된 데이터를 데이터 레이크로 가벼운 변환을 수행합니다. CSV 형식의 데이터 파일이 필요합니다.

수집 단계에서는 데이터를 다운로드하고 Lambda 변환 기능을 사용하여 Kinesis Data Firehose와 S3 데이터 레이크로 데이터 스트림을 시뮬레이션합니다.

몇 줄 스트리밍을 시뮬레이션해 보겠습니다.

# Saving the data in one file.
file = '201907-baywheels-tripdata.csv' data.to_csv(file) # Stream the data 'n' lines at a time.
# Only run this for a minute and stop the cell
def streamer(file, n): with open(file, 'r') as csvfile: header = next(csvfile) data = header counter = 0 loop = True while loop == True: for i in range(n): line = csvfile.readline() data+=line # We reached the end of the csv file. if line == '': loop = False counter+=n # Use your kinesis streaming name stream = client.put_record(DeliveryStreamName='firehose12-DeliveryStream-OJYW71BPYHF2', Record={"Data": bytes(data, 'utf-8')}) data = header print( file, 'HTTPStatusCode: '+ str(stream['ResponseMetadata']['HTTPStatusCode']), 'csv_lines_sent: ' + str(counter), end=' -*- ') sleep(random.randrange(1, 3)) return
# Streaming for 500 lines at a time. You can change this number up and down.
streamer(file, 500) # We can now load our data as a DataFrame because it’s streamed into the S3 data lake:
# Getting data from s3 location where it was streamed.
STREAMED_DATA = 's3://firehose12-deliverybucket-11z0ya3patrod/firehose/2020'
csv_uri = sagemaker.s3.S3Downloader.list(STREAMED_DATA)
in_memory_string = [sagemaker.s3.S3Downloader.read_file(file) for file in csv_uri]
in_memory_csv = [pd.read_csv(StringIO(file), index_col=0) for file in in_memory_string]
display(df.tail())

정리

비용을 최소화하려면 이 연습에 사용된 모든 리소스를 삭제하는 것이 중요합니다. 다음 코드는 우리가 생성한 SageMaker 추론 엔드포인트와 업로드한 교육 및 테스트 데이터를 삭제합니다.

#Delete the s3 data
predictor.delete_endpoint() # Delete s3 data
s3 = boto3.resource('s3')
ml_bucket = sagemaker.Session().default_bucket()
delete_data = s3.Bucket(ml_bucket).objects.filter(Prefix=prefix).delete()

결론

ML 엔지니어, 데이터 과학자 및 소프트웨어 개발자는 Autopilot을 사용하여 ML 프로그래밍 경험이 거의 또는 전혀 없이 추론 파이프라인을 구축 및 배포할 수 있습니다. Autopilot은 데이터 과학 및 ML 모범 사례를 사용하여 시간과 리소스를 절약합니다. 이제 대규모 조직은 엔지니어링 리소스를 인프라 구성에서 모델 개선 및 비즈니스 사용 사례 해결로 이동할 수 있습니다. 스타트업과 소규모 조직은 ML 전문 지식이 거의 또는 전혀 없이 머신 러닝을 시작할 수 있습니다.

SageMaker Autopilot을 시작하려면 G 시리즈 페이지 또는 SageMaker Studio 내에서 SageMaker Autopilot에 액세스합니다.

다음과 같이 SageMaker가 제공해야 하는 다른 중요한 기능에 대해서도 자세히 알아보는 것이 좋습니다. Amazon SageMaker 기능 스토어, 와 통합 Amazon SageMaker 파이프 라인 자동화된 ML 워크플로를 만들고, 기능 검색 및 검색을 추가하고, 재사용합니다. 데이터세트에서 다양한 기능 또는 대상 변형으로 여러 Autopilot 시뮬레이션을 실행할 수 있습니다. 또한 모델이 시간(예: 시간 또는 요일) 또는 위치 또는 이 둘의 조합을 기반으로 차량 수요를 예측하려고 하는 동적 차량 할당 문제로 접근할 수 있습니다.


저자에 관하여

Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.더그 음바야 데이터 및 분석에 중점을 둔 수석 솔루션 설계자입니다. Doug는 AWS 파트너와 긴밀하게 협력하여 클라우드에서 데이터 및 분석 솔루션을 통합할 수 있도록 지원합니다. Doug의 이전 경험에는 승차 공유 및 음식 배달 부문에서 AWS 고객 지원이 포함됩니다.

Amazon SageMaker Autopilot PlatoBlockchain Data Intelligence로 공유 자전거 및 스쿠터 분류 모델을 자동화하십시오. 수직 검색. 일체 포함.발레리오 페로네 Amazon SageMaker 자동 모델 조정 및 자동 조종 장치에서 일하는 응용 과학 관리자입니다.

타임 스탬프 :

더보기 AWS 기계 학습