Effektivisera ETL-databehandling på Talent.com med Amazon SageMaker | Amazon webbtjänster

Effektivisera ETL-databehandling på Talent.com med Amazon SageMaker | Amazon webbtjänster

Det här inlägget är medförfattare av Anatoly Khomenko, Machine Learning Engineer, och Abdenour Bezzouh, Chief Technology Officer på Talent.com.

Etablerat i 2011, Talent.com samlar ihop betalda jobbannonser från sina kunder och offentliga jobbannonser och har skapat en enhetlig, lättsökbar plattform. Talent.com täcker över 30 miljoner jobbannonser i mer än 75 länder och spänner över olika språk, branscher och distributionskanaler.

Talent.coms uppdrag är att underlätta globala arbetskraftskontakter. För att uppnå detta, samlar Talent.com platsannonser från olika källor på webben, och erbjuder arbetssökande tillgång till en omfattande pool med över 30 miljoner jobbmöjligheter skräddarsydda för deras kompetens och erfarenheter. I linje med detta uppdrag, samarbetade Talent.com med AWS för att utveckla en banbrytande jobbrekommendationsmotor som drivs av djup inlärning, som syftar till att hjälpa användare att avancera sina karriärer.

För att säkerställa en effektiv funktion av denna jobbrekommendationsmotor är det avgörande att implementera en storskalig databearbetningspipeline som ansvarar för att extrahera och förfina funktioner från Talent.coms aggregerade jobbannonser. Denna pipeline kan bearbeta 5 miljoner dagliga poster på mindre än 1 timme och möjliggör bearbetning av flera dagars poster parallellt. Dessutom möjliggör denna lösning en snabb distribution till produktion. Den primära datakällan för denna pipeline är JSON Lines-formatet, lagrat i Amazon enkel lagringstjänst (Amazon S3) och uppdelad efter datum. Varje dag resulterar detta i generering av tiotusentals JSON Lines-filer, med inkrementella uppdateringar som sker dagligen.

Det primära syftet med denna databehandlingspipeline är att underlätta skapandet av funktioner som är nödvändiga för att träna och distribuera jobbrekommendationsmotorn på Talent.com. Det är värt att notera att denna pipeline måste stödja inkrementella uppdateringar och tillgodose de intrikata funktionsextraktionskraven som är nödvändiga för utbildnings- och distributionsmodulerna som är nödvändiga för jobbrekommendationssystemet. Vår pipeline tillhör den allmänna ETL-processfamiljen (extrahera, transformera och ladda) som kombinerar data från flera källor till ett stort centralt arkiv.

För ytterligare insikter om hur Talent.com och AWS i samarbete byggde banbrytande naturligt språkbehandling och djupinlärningsmodeller, med hjälp av Amazon SageMaker för att skapa ett jobbrekommendationssystem, se Från text till drömjobb: Bygg en NLP-baserad jobbrekommendator på Talent.com med Amazon SageMaker. Systemet inkluderar funktionsteknik, design för djupinlärning av modellarkitektur, hyperparameteroptimering och modellutvärdering, där alla moduler körs med Python.

Det här inlägget visar hur vi använde SageMaker för att bygga en storskalig databehandlingspipeline för att förbereda funktioner för jobbrekommendationsmotorn på Talent.com. Den resulterande lösningen gör det möjligt för en dataforskare att skapa idéer om funktionsextraktion i en SageMaker-anteckningsbok med hjälp av Python-bibliotek, som t.ex. Scikit-Lär dig or PyTorch, och sedan för att snabbt distribuera samma kod i databehandlingspipelinen genom att utföra funktionsextraktion i stor skala. Lösningen kräver inte portering av funktionsextraktionskoden för att använda PySpark, som krävs vid användning AWS-lim som ETL-lösningen. Vår lösning kan utvecklas och distribueras enbart av en Data Scientist från början till slut med endast en SageMaker, och kräver inte kunskap om andra ETL-lösningar, som t.ex. AWS-batch. Detta kan avsevärt förkorta tiden som behövs för att distribuera pipeline för maskininlärning (ML) till produktion. Pipelinen drivs genom Python och integreras sömlöst med arbetsflöden för extraktion av funktioner, vilket gör den anpassningsbar till ett brett utbud av dataanalysapplikationer.

Lösningsöversikt

Översikt för ETL-pipeline med SageMaker Processing

Rörledningen består av tre primära faser:

  1. Använd en Amazon SageMaker-bearbetning jobb för att hantera råa JSONL-filer associerade med en angiven dag. Flera dagars data kan bearbetas av separata bearbetningsjobb samtidigt.
  2. Använda AWS-lim för datagenomsökning efter att ha bearbetat flera dagars data.
  3. Ladda bearbetade funktioner för ett angivet datumintervall med SQL från en Amazonas Athena tabell, träna och distribuera sedan jobbrekommendationsmodellen.

Bearbeta råa JSONL-filer

Vi bearbetar råa JSONL-filer för en angiven dag med ett SageMaker Processing-jobb. Jobbet implementerar funktionsextraktion och datakomprimering, och sparar bearbetade funktioner i Parkett-filer med 1 miljon poster per fil. Vi drar fördel av CPU-parallellisering för att utföra funktionsextraktion för varje rå JSONL-fil parallellt. Bearbetningsresultat för varje JSONL-fil sparas i en separat Parquet-fil i en tillfällig katalog. Efter att alla JSONL-filer har bearbetats utför vi komprimering av tusentals små parkettfiler till flera filer med 1 miljon poster per fil. De komprimerade parkettfilerna laddas sedan upp till Amazon S3 som utdata från bearbetningsjobbet. Datakomprimeringen säkerställer effektiv genomsökning och SQL-frågor i nästa steg av pipelinen.

Följande är exempelkoden för att schemalägga ett SageMaker Processing-jobb för en angiven dag, till exempel 2020-01-01, med SageMaker SDK. Jobbet läser råa JSONL-filer från Amazon S3 (till exempel från s3://bucket/raw-data/2020/01/01) och sparar de komprimerade Parquet-filerna i Amazon S3 (till exempel till s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

Följande kodöversikt för huvudskriptet (processing_script.py) som kör SageMaker Processing-jobbet är som följer:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

Skalbarhet är en nyckelfunktion i vår pipeline. För det första kan flera SageMaker Processing-jobb användas för att behandla data i flera dagar samtidigt. För det andra undviker vi att ladda hela bearbetade eller rådata i minnet på en gång, samtidigt som vi bearbetar varje angiven datadag. Detta möjliggör bearbetning av data med hjälp av instanstyper som inte kan ta emot en hel dags data i primärminnet. Det enda kravet är att instanstypen ska kunna ladda N rå JSONL- eller bearbetade Parquet-filer i minnet samtidigt, där N är antalet processarbetare som används.

Genomsök bearbetade data med AWS Glue

Efter att alla rådata för flera dagar har bearbetats kan vi skapa en Athena-tabell från hela datasetet genom att använda en AWS Glue-crawler. Vi använder AWS SDK för pandor (awswrangler) bibliotek för att skapa tabellen med följande kodavsnitt:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Ladda bearbetade funktioner för träning

Bearbetade funktioner för ett specificerat datumintervall kan nu laddas från Athena-tabellen med SQL, och dessa funktioner kan sedan användas för att träna jobbrekommendationsmodellen. Till exempel laddar följande utdrag en månad av bearbetade funktioner i en DataFrame med hjälp av awswrangler bibliotek:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

Dessutom kan användningen av SQL för att ladda bearbetade funktioner för utbildning utökas för att tillgodose olika andra användningsfall. Vi kan till exempel använda en liknande pipeline för att upprätthålla två separata Athena-tabeller: en för att lagra användarvisningar och en annan för att lagra användarklick på dessa visningar. Med hjälp av SQL join-satser kan vi hämta visningar som användare antingen klickade på eller inte klickade på och sedan skicka dessa visningar till ett modellutbildningsjobb.

Lösningsfördelar

Att implementera den föreslagna lösningen ger flera fördelar till vårt befintliga arbetsflöde, inklusive:

  • Förenklat genomförande – Lösningen gör att funktionsextraktion kan implementeras i Python med hjälp av populära ML-bibliotek. Och det kräver inte att koden porteras till PySpark. Detta effektiviserar funktionsextraktionen eftersom samma kod som utvecklats av en dataforskare i en anteckningsbok kommer att exekveras av denna pipeline.
  • Snabb väg till produktion – Lösningen kan utvecklas och distribueras av en dataforskare för att utföra funktionsextraktion i stor skala, vilket gör det möjligt för dem att utveckla en ML-rekommendationsmodell mot dessa data. Samtidigt kan samma lösning distribueras till produktion av en ML-ingenjör med små modifieringar som behövs.
  • reus Förmåga – Lösningen ger ett återanvändbart mönster för funktionsextraktion i stor skala och kan enkelt anpassas för andra användningsfall utöver byggnadsrekommendationsmodeller.
  • Effektivitet (CT-värde) – Lösningen erbjuder bra prestanda: bearbetning av en enda dag Talent.coms data tog mindre än 1 timme.
  • Inkrementella uppdateringar – Lösningen stöder även inkrementella uppdateringar. Ny daglig data kan bearbetas med ett SageMaker Processing-jobb, och S3-platsen som innehåller de bearbetade data kan genomsökas igen för att uppdatera Athena-tabellen. Vi kan också använda ett cron-jobb för att uppdatera dagens data flera gånger per dag (till exempel var tredje timme).

Vi använde denna ETL-pipeline för att hjälpa Talent.com att bearbeta 50,000 5 filer per dag som innehåller 90 miljoner poster, och skapade träningsdata med hjälp av funktioner som extraherades från 450 dagars rådata från Talent.com – totalt 900,000 miljoner poster över 2 8.6 filer. Vår pipeline hjälpte Talent.com att bygga och distribuera rekommendationssystemet i produktion inom bara två veckor. Lösningen utförde alla ML-processer inklusive ETL på Amazon SageMaker utan att använda andra AWS-tjänster. Jobrekommendationssystemet ledde till en XNUMX % ökning av klickfrekvensen i online A/B-tester mot en tidigare XGBoost-baserad lösning, vilket hjälpte koppla miljontals Talent.coms användare till bättre jobb.

Slutsats

Det här inlägget beskriver ETL-pipelinen som vi utvecklade för funktionsbearbetning för utbildning och implementering av en jobbrekommendationsmodell på Talent.com. Vår pipeline använder SageMaker Processing-jobb för effektiv databehandling och funktionsextraktion i stor skala. Funktionsextraktionskod är implementerad i Python, vilket möjliggör användning av populära ML-bibliotek för att utföra funktionsextraktion i stor skala, utan att behöva porta koden för att använda PySpark.

Vi uppmuntrar läsarna att utforska möjligheten att använda pipelinen som presenteras i den här bloggen som en mall för deras användningsfall där funktionsextraktion i stor skala krävs. Pipelinen kan utnyttjas av en dataforskare för att bygga en ML-modell, och samma pipeline kan sedan användas av en ML-ingenjör för att köras i produktion. Detta kan avsevärt minska den tid som krävs för att producera ML-lösningen från början till slut, som var fallet med Talent.com. Läsarna kan hänvisa till handledning för att ställa in och köra SageMaker Processing-jobb. Vi hänvisar även läsarna att se inlägget Från text till drömjobb: Bygg en NLP-baserad jobbrekommendator på Talent.com med Amazon SageMaker, där vi diskuterar modellträningstekniker för djupinlärning Amazon SageMaker för att bygga Talent.coms jobbrekommendationssystem.


Om författarna

Dmitrij BespalovDmitrij Bespalov är Senior Applied Scientist vid Amazon Machine Learning Solutions Lab, där han hjälper AWS-kunder inom olika branscher att påskynda deras AI och molninförande.

Yi XiangYi Xiang är en Applied Scientist II på Amazon Machine Learning Solutions Lab, där hon hjälper AWS-kunder inom olika branscher att påskynda deras AI och molninförande.

Tong WangTong Wang är Senior Applied Scientist vid Amazon Machine Learning Solutions Lab, där han hjälper AWS-kunder inom olika branscher att påskynda deras AI och molninförande.

Anatolij KhomenkoAnatolij Khomenko är Senior Machine Learning Engineer på Talent.com med en passion för naturlig språkbehandling matcha bra människor till bra jobb.

Abdenour BezzouhAbdenour Bezzouh är en chef med mer än 25 års erfarenhet av att bygga och leverera tekniska lösningar som skalas till miljontals kunder. Abdenour hade befattningen som Chief Technology Officer (CTO) vid Talent.com när AWS-teamet designade och utförde just denna lösning för Talent.com.

Yanjun QiYanjun Qi är Senior Applied Science Manager på Amazon Machine Learning Solution Lab. Hon förnyar och tillämpar maskininlärning för att hjälpa AWS-kunder att påskynda deras AI- och molnintroduktion.

Tidsstämpel:

Mer från AWS maskininlärning