Raționalizarea procesării datelor ETL la Talent.com cu Amazon SageMaker | Amazon Web Services

Raționalizarea procesării datelor ETL la Talent.com cu Amazon SageMaker | Amazon Web Services

Acest post este co-autor de Anatoly Khomenko, inginer de învățare automată și Abdenour Bezzouh, director de tehnologie la Talent.com.

Înființată în 2011, Talent.com cumulează listele de locuri de muncă plătite de la clienții lor și listele publice de locuri de muncă și a creat o platformă unificată, ușor de căutat. Acoperind peste 30 de milioane de locuri de muncă în peste 75 de țări și acoperind diverse limbi, industrii și canale de distribuție, Talent.com răspunde nevoilor diverse ale persoanelor în căutarea unui loc de muncă, conectând efectiv milioane de solicitanți de locuri de muncă cu oportunități de angajare.

Misiunea Talent.com este de a facilita conexiunile cu forța de muncă la nivel mondial. Pentru a realiza acest lucru, Talent.com adună liste de locuri de muncă din diverse surse de pe web, oferind persoanelor în căutarea unui loc de muncă acces la un grup extins de peste 30 de milioane de oportunități de angajare adaptate abilităților și experiențelor lor. În conformitate cu această misiune, Talent.com a colaborat cu AWS pentru a dezvolta un motor de ultimă oră de recomandare a locurilor de muncă, bazat pe învățarea profundă, menit să ajute utilizatorii să-și avanseze cariera.

Pentru a asigura funcționarea eficientă a acestui motor de recomandare a locurilor de muncă, este esențial să implementați o conductă de procesare a datelor la scară largă responsabilă cu extragerea și rafinarea caracteristicilor din listele de locuri de muncă agregate ale Talent.com. Această conductă este capabilă să proceseze 5 milioane de înregistrări zilnice în mai puțin de 1 oră și permite procesarea mai multor zile de înregistrări în paralel. În plus, această soluție permite o implementare rapidă în producție. Sursa principală de date pentru această conductă este formatul JSON Lines, stocat în Serviciul Amazon de stocare simplă (Amazon S3) și împărțite după dată. În fiecare zi, acest lucru are ca rezultat generarea a zeci de mii de fișiere JSON Lines, cu actualizări incrementale care au loc zilnic.

Obiectivul principal al acestei conducte de procesare a datelor este de a facilita crearea de funcții necesare pentru instruirea și implementarea motorului de recomandare a locurilor de muncă pe Talent.com. Este demn de remarcat faptul că această conductă trebuie să accepte actualizări incrementale și să răspundă cerințelor complexe de extragere a caracteristicilor necesare modulelor de instruire și implementare esențiale pentru sistemul de recomandare a locurilor de muncă. Conducta noastră aparține familiei generale de procese ETL (extragere, transformare și încărcare) care combină date din mai multe surse într-un depozit central mare.

Pentru mai multe informații despre modul în care Talent.com și AWS au construit în colaborare tehnici de ultimă oră de procesare a limbajului natural și modele de învățare profundă, folosind Amazon SageMaker pentru a crea un sistem de recomandare de locuri de muncă, consultați De la text la jobul de vis: construiți un recomandator de locuri de muncă bazat pe NLP la Talent.com cu Amazon SageMaker. Sistemul include inginerie de caracteristici, proiectare a arhitecturii modelului de învățare profundă, optimizare a hiperparametrilor și evaluare a modelului, unde toate modulele sunt rulate folosind Python.

Această postare arată cum am folosit SageMaker pentru a construi o conductă de procesare a datelor la scară largă pentru pregătirea funcțiilor pentru motorul de recomandare a locurilor de muncă de la Talent.com. Soluția rezultată permite unui Data Scientist să conceapă extragerea de caracteristici într-un notebook SageMaker folosind biblioteci Python, cum ar fi Scikit-Learn or PyTorch, apoi pentru a implementa rapid același cod în conducta de procesare a datelor, efectuând extragerea caracteristicilor la scară. Soluția nu necesită portarea codului de extracție a caracteristicilor pentru a utiliza PySpark, așa cum este necesar atunci când se utilizează AWS Adeziv ca soluție ETL. Soluția noastră poate fi dezvoltată și implementată exclusiv de către un Data Scientist end-to-end folosind doar un SageMaker și nu necesită cunoștințe despre alte soluții ETL, cum ar fi Lot AWS. Acest lucru poate scurta semnificativ timpul necesar pentru implementarea conductei de învățare automată (ML) în producție. Conducta este operată prin Python și se integrează perfect cu fluxurile de lucru de extragere a caracteristicilor, făcându-l adaptabil la o gamă largă de aplicații de analiză a datelor.

Prezentare generală a soluțiilor

Prezentare generală pentru pipeline ETL folosind SageMaker Processing

Conducta este compusă din trei faze principale:

  1. Utilizați un Procesare Amazon SageMaker job pentru a gestiona fișierele JSONL brute asociate cu o zi specificată. Mai multe zile de date pot fi procesate prin lucrări separate de procesare simultan.
  2. Angaja AWS Adeziv pentru accesarea cu crawlere a datelor după procesarea mai multor zile de date.
  3. Încărcați caracteristicile procesate pentru un interval de date specificat utilizând SQL dintr-un Amazon Atena tabel, apoi antrenați și implementați modelul de recomandare de locuri de muncă.

Procesați fișiere JSONL brute

Procesăm fișiere JSONL brute pentru o zi specificată folosind o lucrare de procesare SageMaker. Lucrarea implementează extragerea caracteristicilor și compactarea datelor și salvează caracteristicile procesate în fișiere Parquet cu 1 milion de înregistrări per fișier. Profităm de paralelizarea CPU pentru a efectua extragerea caracteristicilor pentru fiecare fișier JSONL brut în paralel. Rezultatele procesării fiecărui fișier JSONL sunt salvate într-un fișier Parquet separat într-un director temporar. După ce toate fișierele JSONL au fost procesate, efectuăm compactarea a mii de fișiere Parchet mici în mai multe fișiere cu 1 milion de înregistrări per fișier. Fișierele compactate Parquet sunt apoi încărcate în Amazon S3 ca rezultat al jobului de procesare. Compactarea datelor asigură accesarea cu crawlere eficientă și interogări SQL în etapele următoare ale conductei.

Următorul este exemplul de cod pentru a programa o lucrare de procesare SageMaker pentru o zi specificată, de exemplu 2020-01-01, folosind SDK-ul SageMaker. Lucrarea citește fișiere JSONL brute de pe Amazon S3 (de exemplu, din s3://bucket/raw-data/2020/01/01) și salvează fișierele compactate Parquet în Amazon S3 (de exemplu pentru s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

Următoarea schiță de cod pentru scriptul principal (processing_script.py) care rulează jobul de procesare SageMaker este următorul:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

Scalabilitatea este o caracteristică cheie a conductei noastre. În primul rând, mai multe lucrări SageMaker Processing pot fi utilizate pentru a procesa date timp de mai multe zile simultan. În al doilea rând, evităm să încărcăm în memorie toate datele procesate sau brute simultan, în timp ce procesăm fiecare zi specificată de date. Acest lucru permite procesarea datelor folosind tipuri de instanțe care nu pot găzdui o zi întreagă de date în memoria primară. Singura cerință este ca tipul de instanță să fie capabil să încarce simultan N fișiere JSONL brute sau procesate Parquet în memorie, N fiind numărul de lucrători de proces în uz.

Accesați cu crawlere datele procesate folosind AWS Glue

După ce toate datele brute pentru mai multe zile au fost procesate, putem crea un tabel Athena din întregul set de date utilizând un crawler AWS Glue. Noi folosim AWS SDK pentru panda (awswrangler) bibliotecă pentru a crea tabelul folosind următorul fragment:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Încărcați funcțiile procesate pentru antrenament

Caracteristicile procesate pentru un interval de date specificat pot fi acum încărcate din tabelul Athena utilizând SQL, iar aceste caracteristici pot fi apoi utilizate pentru antrenarea modelului de recomandare de locuri de muncă. De exemplu, următorul fragment încarcă o lună de caracteristici procesate într-un DataFrame folosind awswrangler bibliotecă:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

În plus, utilizarea SQL pentru încărcarea caracteristicilor procesate pentru instruire poate fi extinsă pentru a găzdui diverse alte cazuri de utilizare. De exemplu, putem aplica o conductă similară pentru a menține două tabele Athena separate: unul pentru stocarea afișărilor utilizatorilor și altul pentru stocarea clicurilor utilizatorilor pe aceste afișări. Folosind instrucțiunile SQL join, putem prelua afișările pe care utilizatorii fie au făcut clic, fie pe care nu le-au făcut clic, apoi putem transmite aceste afișări unui job de formare a modelului.

Beneficiile soluției

Implementarea soluției propuse aduce mai multe avantaje fluxului nostru de lucru existent, inclusiv:

  • Implementare simplificată – Soluția permite extragerea caracteristicilor să fie implementată în Python folosind biblioteci ML populare. Și, nu necesită ca codul să fie portat în PySpark. Acest lucru simplifică extracția caracteristicilor, deoarece același cod dezvoltat de un Data Scientist într-un notebook va fi executat de această conductă.
  • Cale rapidă către producție – Soluția poate fi dezvoltată și implementată de un Data Scientist pentru a efectua extragerea caracteristicilor la scară, permițându-le să dezvolte un model de recomandare ML pe baza acestor date. În același timp, aceeași soluție poate fi implementată în producție de către un inginer ML, cu mici modificări necesare.
  • Abilitatea de Reus – Soluția oferă un model reutilizabil pentru extragerea caracteristicilor la scară și poate fi adaptată cu ușurință pentru alte cazuri de utilizare, dincolo de construirea modelelor de recomandare.
  • Eficiență – Soluția oferă performanțe bune: procesarea unei singure zile a Talent.comdatele lui au durat mai puțin de 1 oră.
  • Actualizări incrementale – Soluția acceptă și actualizări incrementale. Noile date zilnice pot fi procesate cu un job de procesare SageMaker, iar locația S3 care conține datele procesate poate fi accesată din nou pentru a actualiza tabelul Athena. De asemenea, putem folosi un job cron pentru a actualiza datele de astăzi de mai multe ori pe zi (de exemplu, la fiecare 3 ore).

Am folosit această conductă ETL pentru a ajuta Talent.com să proceseze 50,000 de fișiere pe zi care conțin 5 milioane de înregistrări și am creat date de antrenament folosind funcții extrase din 90 de zile de date brute de pe Talent.com - un total de 450 de milioane de înregistrări în 900,000 de fișiere. Conducta noastră a ajutat Talent.com să construiască și să implementeze sistemul de recomandare în producție în doar 2 săptămâni. Soluția a efectuat toate procesele ML, inclusiv ETL pe Amazon SageMaker, fără a utiliza alt serviciu AWS. Sistemul de recomandare de locuri de muncă a determinat o creștere cu 8.6% a ratei de clic în testarea A/B online față de o soluție anterioară bazată pe XGBoost, ajutând să conecteze milioane de utilizatori Talent.com la locuri de muncă mai bune.

Concluzie

Această postare prezintă canalul ETL pe care l-am dezvoltat pentru procesarea caracteristicilor pentru instruirea și implementarea unui model de recomandare de locuri de muncă la Talent.com. Conducta noastră utilizează joburi de procesare SageMaker pentru procesarea eficientă a datelor și extragerea caracteristicilor la scară largă. Codul de extracție a caracteristicilor este implementat în Python, permițând utilizarea bibliotecilor ML populare pentru a efectua extragerea caracteristicilor la scară, fără a fi nevoie de a porta codul pentru a utiliza PySpark.

Încurajăm cititorii să exploreze posibilitatea de a utiliza conducta prezentată în acest blog ca șablon pentru cazurile lor de utilizare în care este necesară extragerea caracteristicilor la scară. Conducta poate fi folosită de un Data Scientist pentru a construi un model ML, iar aceeași conductă poate fi apoi adoptată de un inginer ML pentru a rula în producție. Acest lucru poate reduce semnificativ timpul necesar pentru producerea soluției ML de la capăt la capăt, așa cum a fost cazul Talent.com. Cititorii se pot referi la tutorial pentru configurarea și rularea joburilor de procesare SageMaker. De asemenea, recomandăm cititorii să vizualizeze postarea De la text la jobul de vis: construiți un recomandator de locuri de muncă bazat pe NLP la Talent.com cu Amazon SageMaker, unde discutăm tehnicile de antrenament ale modelului de învățare profundă folosind Amazon SageMaker pentru a construi sistemul de recomandare de locuri de muncă al Talent.com.


Despre autori

Dmitri BespalovDmitri Bespalov este cercetător senior aplicat la Amazon Machine Learning Solutions Lab, unde ajută clienții AWS din diferite industrii să-și accelereze adoptarea AI și a cloud-ului.

Yi XiangYi Xiang este Applied Scientist II la Amazon Machine Learning Solutions Lab, unde ajută clienții AWS din diferite industrii să-și accelereze adoptarea AI și a cloud-ului.

Tong WangTong Wang este cercetător senior aplicat la Amazon Machine Learning Solutions Lab, unde ajută clienții AWS din diferite industrii să-și accelereze adoptarea AI și a cloud-ului.

Anatoli KhomenkoAnatoli Khomenko este inginer senior de învățare automată la Talent.com cu o pasiune pentru procesarea limbajului natural, potrivirea oamenilor buni cu locuri de muncă bune.

Abdenour BezzouhAbdenour Bezzouh este un director cu peste 25 de ani de experiență în construirea și furnizarea de soluții tehnologice care se extind la milioane de clienți. Abdenour a deținut poziția de Chief Technology Officer (CTO) la Talent.com când echipa AWS a proiectat și executat această soluție specială pentru Talent.com.

Yanjun QiYanjun Qi este manager senior de științe aplicate la Amazon Machine Learning Solution Lab. Ea inovează și aplică învățarea automată pentru a ajuta clienții AWS să-și accelereze adoptarea AI și a cloud-ului.

Timestamp-ul:

Mai mult de la Învățare automată AWS