Semplificazione dell'elaborazione dei dati ETL su Talent.com con Amazon SageMaker | Servizi Web di Amazon

Semplificazione dell'elaborazione dei dati ETL su Talent.com con Amazon SageMaker | Servizi Web di Amazon

Questo post è stato scritto in collaborazione da Anatoly Khomenko, Machine Learning Engineer, e Abdenour Bezzouh, Chief Technology Officer di Talent.com.

Fondata nel 2011, talent.com aggrega annunci di lavoro retribuiti dai loro clienti e annunci di lavoro pubblici e ha creato una piattaforma unificata e facilmente ricercabile. Coprendo oltre 30 milioni di annunci di lavoro in più di 75 paesi e coprendo varie lingue, settori e canali di distribuzione, Talent.com soddisfa le diverse esigenze di chi cerca lavoro, collegando efficacemente milioni di persone in cerca di lavoro con opportunità di lavoro.

La missione di Talent.com è facilitare le connessioni della forza lavoro globale. Per raggiungere questo obiettivo, Talent.com aggrega annunci di lavoro provenienti da varie fonti sul web, offrendo a chi cerca lavoro l'accesso a un ampio pool di oltre 30 milioni di opportunità di lavoro su misura per le loro competenze ed esperienze. In linea con questa missione, Talent.com ha collaborato con AWS per sviluppare un motore di raccomandazione di lavoro all'avanguardia basato sul deep learning, volto ad assistere gli utenti nell'avanzamento della loro carriera.

Per garantire il funzionamento efficace di questo motore di raccomandazione di lavoro, è fondamentale implementare una pipeline di elaborazione dati su larga scala responsabile dell’estrazione e del perfezionamento delle funzionalità dagli elenchi di lavoro aggregati di Talent.com. Questa pipeline è in grado di elaborare 5 milioni di record giornalieri in meno di 1 ora e consente di elaborare più giorni di record in parallelo. Inoltre, questa soluzione consente una rapida implementazione in produzione. La fonte primaria di dati per questa pipeline è il formato JSON Lines, archiviato in Servizio di archiviazione semplice Amazon (Amazon S3) e partizionato per data. Ogni giorno, ciò si traduce nella generazione di decine di migliaia di file JSON Lines, con aggiornamenti incrementali che si verificano quotidianamente.

L'obiettivo principale di questa pipeline di elaborazione dei dati è facilitare la creazione delle funzionalità necessarie per la formazione e l'implementazione del motore di raccomandazione di lavoro su Talent.com. Vale la pena notare che questa pipeline deve supportare aggiornamenti incrementali e soddisfare i complessi requisiti di estrazione delle funzionalità necessari per i moduli di formazione e implementazione essenziali per il sistema di raccomandazione del lavoro. La nostra pipeline appartiene alla famiglia di processi ETL (estrazione, trasformazione e caricamento) generale che combina dati provenienti da più fonti in un ampio repository centrale.

Per ulteriori approfondimenti su come Talent.com e AWS hanno creato in collaborazione tecniche all'avanguardia di elaborazione del linguaggio naturale e di formazione di modelli di deep learning, utilizzando Amazon Sage Maker per creare un sistema di raccomandazione del lavoro, fare riferimento a Dal testo al lavoro da sogno: creazione di un suggeritore di lavoro basato sulla PNL su Talent.com con Amazon SageMaker. Il sistema include ingegneria delle funzionalità, progettazione dell'architettura del modello di deep learning, ottimizzazione degli iperparametri e valutazione del modello, in cui tutti i moduli vengono eseguiti utilizzando Python.

Questo post mostra come abbiamo utilizzato SageMaker per creare una pipeline di elaborazione dati su larga scala per preparare funzionalità per il motore di raccomandazione di lavoro su Talent.com. La soluzione risultante consente a un data scientist di ideare l'estrazione di funzionalità in un notebook SageMaker utilizzando librerie Python, come Scikit-Impara or PyTorche quindi di distribuire rapidamente lo stesso codice nella pipeline di elaborazione dei dati eseguendo l'estrazione delle funzionalità su larga scala. La soluzione non richiede il porting del codice di estrazione delle funzionalità per utilizzare PySpark, come richiesto durante l'utilizzo Colla AWS come soluzione ETL. La nostra soluzione può essere sviluppata e distribuita esclusivamente da un Data Scientist end-to-end utilizzando solo un SageMaker e non richiede la conoscenza di altre soluzioni ETL, come Batch AWS. Ciò può ridurre significativamente il tempo necessario per implementare la pipeline di Machine Learning (ML) in produzione. La pipeline viene gestita tramite Python e si integra perfettamente con i flussi di lavoro di estrazione delle funzionalità, rendendola adattabile a un'ampia gamma di applicazioni di analisi dei dati.

Panoramica della soluzione

Panoramica della pipeline ETL utilizzando SageMaker Processing

La pipeline è composta da tre fasi principali:

  1. Utilizzare un Elaborazione di Amazon SageMaker lavoro per gestire file JSONL non elaborati associati a un giorno specificato. È possibile elaborare più giorni di dati simultaneamente tramite processi di elaborazione separati.
  2. impiegare Colla AWS per la scansione dei dati dopo l'elaborazione di più giorni di dati.
  3. Carica le funzionalità elaborate per un intervallo di date specificato utilizzando SQL da un file Amazzone Atena tabella, quindi addestrare e distribuire il modello di raccomandazione del lavoro.

Elabora file JSONL grezzi

Elaboriamo file JSONL grezzi per un giorno specifico utilizzando un lavoro di elaborazione SageMaker. Il lavoro implementa l'estrazione delle funzionalità e la compattazione dei dati e salva le funzionalità elaborate in file Parquet con 1 milione di record per file. Sfruttiamo la parallelizzazione della CPU per eseguire l'estrazione delle funzionalità per ciascun file JSONL non elaborato in parallelo. I risultati dell'elaborazione di ciascun file JSONL vengono salvati in un file Parquet separato all'interno di una directory temporanea. Dopo che tutti i file JSONL sono stati elaborati, eseguiamo la compattazione di migliaia di piccoli file Parquet in più file con 1 milione di record per file. I file Parquet compattati vengono quindi caricati in Amazon S3 come output del processo di elaborazione. La compattazione dei dati garantisce una scansione efficiente e query SQL nelle fasi successive della pipeline.

Di seguito è riportato il codice di esempio per pianificare un processo di elaborazione SageMaker per un giorno specifico, ad esempio 2020-01-01, utilizzando SageMaker SDK. Il lavoro legge file JSONL non elaborati da Amazon S3 (ad esempio da s3://bucket/raw-data/2020/01/01) e salva i file Parquet compattati in Amazon S3 (ad esempio in s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

La seguente struttura del codice per lo script principale (processing_script.py) che esegue il lavoro SageMaker Processing è il seguente:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

La scalabilità è una caratteristica chiave della nostra pipeline. Innanzitutto, è possibile utilizzare più processi SageMaker Processing per elaborare i dati per diversi giorni contemporaneamente. In secondo luogo, evitiamo di caricare in memoria tutti i dati elaborati o grezzi in una sola volta, elaborando ogni giorno di dati specificato. Ciò consente l'elaborazione dei dati utilizzando tipi di istanza che non possono ospitare i dati di un'intera giornata nella memoria primaria. L'unico requisito è che il tipo di istanza sia in grado di caricare contemporaneamente in memoria N file JSONL non elaborati o file Parquet elaborati, dove N è il numero di processi di lavoro in uso.

Scansione dei dati elaborati utilizzando AWS Glue

Dopo che tutti i dati grezzi per più giorni sono stati elaborati, possiamo creare una tabella Athena dall'intero set di dati utilizzando un crawler AWS Glue. Noi usiamo il SDK AWS per panda (awswrangler) libreria per creare la tabella utilizzando il seguente snippet:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Carica le funzionalità elaborate per l'addestramento

Le funzionalità elaborate per un intervallo di date specificato possono ora essere caricate dalla tabella Athena utilizzando SQL e queste funzionalità possono quindi essere utilizzate per addestrare il modello di raccomandazione del lavoro. Ad esempio, il seguente frammento carica un mese di funzionalità elaborate in un DataFrame utilizzando il file awswrangler biblioteca:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

Inoltre, l'uso di SQL per caricare le funzionalità elaborate per l'addestramento può essere esteso per adattarsi a vari altri casi d'uso. Ad esempio, possiamo applicare una pipeline simile per mantenere due tabelle Athena separate: una per memorizzare le impressioni degli utenti e un'altra per memorizzare i clic degli utenti su queste impressioni. Utilizzando le istruzioni join SQL, possiamo recuperare le impressioni su cui gli utenti hanno fatto clic o su cui non hanno fatto clic e quindi passare queste impressioni a un processo di addestramento del modello.

Vantaggi della soluzione

L'implementazione della soluzione proposta apporta numerosi vantaggi al nostro flusso di lavoro esistente, tra cui:

  • Implementazione semplificata – La soluzione consente di implementare l'estrazione delle funzionalità in Python utilizzando le librerie ML più diffuse. Inoltre, non è necessario che il codice venga trasferito in PySpark. Ciò semplifica l'estrazione delle funzionalità poiché lo stesso codice sviluppato da un data scientist in un notebook verrà eseguito da questa pipeline.
  • Percorso rapido verso la produzione – La soluzione può essere sviluppata e distribuita da un data scientist per eseguire l'estrazione di funzionalità su larga scala, consentendo loro di sviluppare un modello di raccomandazione ML rispetto a questi dati. Allo stesso tempo, la stessa soluzione può essere distribuita in produzione da un ingegnere ML con poche modifiche necessarie.
  • riutilizzabilità – La soluzione fornisce un modello riutilizzabile per l'estrazione di funzionalità su larga scala e può essere facilmente adattata per altri casi d'uso oltre alla creazione di modelli di raccomandazione.
  • EFFICIENZA – La soluzione offre buone prestazioni: elaborazione di un solo giorno del talent.comi dati di hanno richiesto meno di 1 ora.
  • Aggiornamenti incrementali – La soluzione supporta anche aggiornamenti incrementali. I nuovi dati giornalieri possono essere elaborati con un lavoro di elaborazione SageMaker e la posizione S3 contenente i dati elaborati può essere sottoposta nuovamente a scansione per aggiornare la tabella Athena. Possiamo anche utilizzare un lavoro cron per aggiornare i dati di oggi più volte al giorno (ad esempio ogni 3 ore).

Abbiamo utilizzato questa pipeline ETL per aiutare Talent.com a elaborare 50,000 file al giorno contenenti 5 milioni di record e abbiamo creato dati di training utilizzando funzionalità estratte da 90 giorni di dati grezzi da Talent.com, per un totale di 450 milioni di record in 900,000 file. La nostra pipeline ha aiutato Talent.com a creare e implementare il sistema di consigli in produzione in sole 2 settimane. La soluzione ha eseguito tutti i processi ML incluso ETL su Amazon SageMaker senza utilizzare altri servizi AWS. Il sistema di raccomandazione delle offerte di lavoro ha portato a un aumento dell'8.6% della percentuale di clic nei test A/B online rispetto a una precedente soluzione basata su XGBoost, contribuendo a connettere milioni di utenti di Talent.com a lavori migliori.

Conclusione

Questo post descrive la pipeline ETL che abbiamo sviluppato per l'elaborazione delle funzionalità per la formazione e l'implementazione di un modello di raccomandazione di lavoro su Talent.com. La nostra pipeline utilizza processi di elaborazione SageMaker per un'elaborazione efficiente dei dati e l'estrazione di funzionalità su larga scala. Il codice di estrazione delle funzionalità è implementato in Python consentendo l'uso delle librerie ML più diffuse per eseguire l'estrazione delle funzionalità su larga scala, senza la necessità di trasferire il codice per utilizzare PySpark.

Incoraggiamo i lettori a esplorare la possibilità di utilizzare la pipeline presentata in questo blog come modello per i loro casi d'uso in cui è richiesta l'estrazione di funzionalità su larga scala. La pipeline può essere sfruttata da un data scientist per creare un modello ML e la stessa pipeline può quindi essere adottata da un ingegnere ML per essere eseguita in produzione. Ciò può ridurre significativamente il tempo necessario per produrre la soluzione ML end-to-end, come nel caso di Talent.com. I lettori possono fare riferimento a tutorial per la configurazione e l'esecuzione dei processi di elaborazione SageMaker. Rimandiamo inoltre i lettori alla visione del post Dal testo al lavoro da sogno: creazione di un suggeritore di lavoro basato sulla PNL su Talent.com con Amazon SageMaker, in cui discutiamo delle tecniche di addestramento del modello di deep learning che utilizzano Amazon Sage Maker per costruire il sistema di raccomandazione del lavoro di Talent.com.


Circa gli autori

Dmitry BespalovDmitry Bespalov è un Senior Applied Scientist presso l'Amazon Machine Learning Solutions Lab, dove aiuta i clienti AWS in diversi settori ad accelerare l'adozione di IA e cloud.

Yi XiangYi Xiang è Applied Scientist II presso l'Amazon Machine Learning Solutions Lab, dove aiuta i clienti AWS di diversi settori ad accelerare l'adozione dell'intelligenza artificiale e del cloud.

Ton WangTon Wang è un Senior Applied Scientist presso l'Amazon Machine Learning Solutions Lab, dove aiuta i clienti AWS in diversi settori ad accelerare l'adozione di IA e cloud.

Anatolij KhomenkoAnatolij Khomenko è un ingegnere senior di machine learning presso talent.com con una passione per l'elaborazione del linguaggio naturale che abbina brave persone a buoni lavori.

Abdenour BezzouhAbdenour Bezzouh è un dirigente con oltre 25 anni di esperienza nella creazione e fornitura di soluzioni tecnologiche scalabili a milioni di clienti. Abdenour ha ricoperto la carica di Chief Technology Officer (CTO) presso talent.com quando il team AWS ha progettato ed eseguito questa particolare soluzione talent.com.

Yanjun QiYanjun Qi è Senior Applied Science Manager presso l'Amazon Machine Learning Solution Lab. Innova e applica l'apprendimento automatico per aiutare i clienti AWS ad accelerare l'adozione dell'intelligenza artificiale e del cloud.

Timestamp:

Di più da Apprendimento automatico di AWS