Stroomlijning van ETL-gegevensverwerking bij Talent.com met Amazon SageMaker | Amazon-webservices

Stroomlijning van ETL-gegevensverwerking bij Talent.com met Amazon SageMaker | Amazon-webservices

Dit bericht is mede geschreven door Anatoly Khomenko, Machine Learning Engineer, en Abdenour Bezzouh, Chief Technology Officer bij Talent.com.

Opgericht in 2011, Talent. com verzamelt betaalde vacatures van hun klanten en openbare vacatures, en heeft een uniform, gemakkelijk doorzoekbaar platform gecreรซerd. Met meer dan 30 miljoen vacatures in meer dan 75 landen en verspreid over verschillende talen, sectoren en distributiekanalen speelt Talent.com in op de uiteenlopende behoeften van werkzoekenden, waardoor miljoenen werkzoekenden effectief worden verbonden met vacatures.

De missie van Talent.com is het faciliteren van wereldwijde verbindingen met personeel. Om dit te bereiken verzamelt Talent.com vacatures uit verschillende bronnen op internet, waardoor werkzoekenden toegang krijgen tot een uitgebreide pool van meer dan 30 miljoen vacatures die zijn afgestemd op hun vaardigheden en ervaringen. In lijn met deze missie heeft Talent.com samengewerkt met AWS om een โ€‹โ€‹baanbrekende engine voor functieaanbevelingen te ontwikkelen, aangedreven door deep learning, gericht op het helpen van gebruikers bij het bevorderen van hun carriรจre.

Om de effectieve werking van deze vacature-aanbevelingsengine te garanderen, is het van cruciaal belang om een โ€‹โ€‹grootschalige gegevensverwerkingspijplijn te implementeren die verantwoordelijk is voor het extraheren en verfijnen van functies uit de geaggregeerde vacatures van Talent.com. Deze pijplijn kan in minder dan 5 uur 1 miljoen records per dag verwerken en maakt het mogelijk meerdere dagen aan records parallel te verwerken. Bovendien maakt deze oplossing een snelle implementatie in productie mogelijk. De primaire gegevensbron voor deze pijplijn is het JSON Lines-formaat, opgeslagen in Amazon eenvoudige opslagservice (Amazon S3) en gepartitioneerd op datum. Elke dag resulteert dit in het genereren van tienduizenden JSON Lines-bestanden, waarbij dagelijks incrementele updates plaatsvinden.

Het primaire doel van deze gegevensverwerkingspijplijn is het faciliteren van het creรซren van functies die nodig zijn voor het trainen en inzetten van de functie-aanbevelingsengine op Talent.com. Het is vermeldenswaard dat deze pijplijn incrementele updates moet ondersteunen en tegemoet moet komen aan de ingewikkelde vereisten voor het extraheren van functies die nodig zijn voor de trainings- en implementatiemodules die essentieel zijn voor het functieaanbevelingssysteem. Onze pijplijn behoort tot de algemene ETL-procesfamilie (extract, transform, and load) die gegevens uit meerdere bronnen combineert in een grote, centrale opslagplaats.

Voor meer inzicht in hoe Talent.com en AWS samen geavanceerde trainingstechnieken voor natuurlijke taalverwerking en deep learning-modellen hebben ontwikkeld, met behulp van Amazon Sage Maker om een โ€‹โ€‹functieaanbevelingssysteem op te stellen, zie Van tekst naar droombaan: een NLP-gebaseerde baanaanbeveler bouwen op Talent.com met Amazon SageMaker. Het systeem omvat feature-engineering, deep learning-modelarchitectuurontwerp, hyperparameteroptimalisatie en modelevaluatie, waarbij alle modules worden uitgevoerd met behulp van Python.

Dit bericht laat zien hoe we SageMaker hebben gebruikt om een โ€‹โ€‹grootschalige gegevensverwerkingspijplijn te bouwen voor het voorbereiden van functies voor de vacature-aanbevelingsengine op Talent.com. De resulterende oplossing stelt een datawetenschapper in staat om functie-extractie in een SageMaker-notebook te bedenken met behulp van Python-bibliotheken, zoals Scikit-Leren or PyTorch, en vervolgens dezelfde code snel in de gegevensverwerkingspijplijn te implementeren, waarbij functie-extractie op schaal wordt uitgevoerd. De oplossing vereist geen portering van de feature-extractiecode om PySpark te gebruiken, zoals vereist bij gebruik AWS lijm als de ETL-oplossing. Onze oplossing kan uitsluitend door een Data Scientist end-to-end worden ontwikkeld en geรฏmplementeerd met alleen een SageMaker, en vereist geen kennis van andere ETL-oplossingen, zoals AWS-batch. Dit kan de tijd die nodig is om de Machine Learning (ML)-pijplijn in productie te nemen aanzienlijk verkorten. De pijplijn wordt beheerd via Python en kan naadloos worden geรฏntegreerd met workflows voor functie-extractie, waardoor deze aanpasbaar is aan een breed scala aan data-analysetoepassingen.

Overzicht oplossingen

Overzicht voor ETL-pijplijn met behulp van SageMaker Processing

De pijplijn bestaat uit drie primaire fasen:

  1. Gebruik een Amazon SageMaker-verwerking taak om onbewerkte JSONL-bestanden te verwerken die aan een bepaalde dag zijn gekoppeld. Er kunnen meerdere dagen aan gegevens tegelijkertijd worden verwerkt door afzonderlijke verwerkingstaken.
  2. Dienst AWS lijm voor het crawlen van gegevens na het verwerken van meerdere dagen aan gegevens.
  3. Laad verwerkte objecten voor een opgegeven datumbereik met behulp van SQL uit een Amazone Athene tabel, train en implementeer vervolgens het taakaanbevelingsmodel.

Verwerk onbewerkte JSONL-bestanden

We verwerken onbewerkte JSONL-bestanden voor een bepaalde dag met behulp van een SageMaker Processing-taak. De taak implementeert functie-extractie en gegevensverdichting, en slaat verwerkte functies op in Parquet-bestanden met 1 miljoen records per bestand. We maken gebruik van CPU-parallellisatie om functie-extractie voor elk onbewerkt JSONL-bestand parallel uit te voeren. De verwerkingsresultaten van elk JSONL-bestand worden opgeslagen in een afzonderlijk Parquet-bestand in een tijdelijke map. Nadat alle JSONL-bestanden zijn verwerkt, comprimeren we duizenden kleine Parquet-bestanden tot meerdere bestanden met 1 miljoen records per bestand. De gecomprimeerde Parquet-bestanden worden vervolgens geรผpload naar Amazon S3 als uitvoer van de verwerkingstaak. De gegevensverdichting zorgt voor efficiรซnt crawlen en SQL-query's in de volgende fasen van de pijplijn.

Hier volgt de voorbeeldcode voor het plannen van een SageMaker Processing-taak voor een bepaalde dag, bijvoorbeeld 2020-01-01, met behulp van de SageMaker SDK. De taak leest onbewerkte JSONL-bestanden van Amazon S3 (bijvoorbeeld van s3://bucket/raw-data/2020/01/01) en slaat de gecomprimeerde Parquet-bestanden op in Amazon S3 (bijvoorbeeld naar s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

Het volgende codeoverzicht voor het hoofdscript (processing_script.py) waarmee de SageMaker Processing-taak wordt uitgevoerd, is als volgt:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

Schaalbaarheid is een belangrijk kenmerk van onze pijplijn. Ten eerste kunnen meerdere SageMaker Processing-taken worden gebruikt om gegevens gedurende meerdere dagen tegelijkertijd te verwerken. Ten tweede vermijden we dat we de volledige verwerkte of onbewerkte gegevens in รฉรฉn keer in het geheugen laden, terwijl we elke specifieke dag aan gegevens verwerken. Dit maakt de verwerking van gegevens mogelijk met behulp van exemplaartypen die niet de gegevens van een volledige dag in het primaire geheugen kunnen huisvesten. De enige vereiste is dat het exemplaartype in staat moet zijn N onbewerkte JSONL- of verwerkte Parquet-bestanden tegelijkertijd in het geheugen te laden, waarbij N het aantal gebruikte proceswerkers is.

Crawl verwerkte gegevens met AWS Glue

Nadat alle ruwe data van meerdere dagen zijn verwerkt, kunnen we van de gehele dataset een Athena-tabel maken met behulp van een AWS Glue-crawler. Wij gebruiken de AWS SDK voor panda's (awswrangler) bibliotheek om de tabel te maken met behulp van het volgende fragment:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Laad verwerkte functies voor training

Verwerkte functies voor een opgegeven datumbereik kunnen nu met behulp van SQL uit de Athena-tabel worden geladen, en deze functies kunnen vervolgens worden gebruikt voor het trainen van het taakaanbevelingsmodel. Het volgende fragment laadt bijvoorbeeld een maand aan verwerkte functies in een DataFrame met behulp van de awswrangler bibliotheek:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

Bovendien kan het gebruik van SQL voor het laden van verwerkte functies voor training worden uitgebreid om tegemoet te komen aan verschillende andere gebruiksscenario's. We kunnen bijvoorbeeld een vergelijkbare pijplijn toepassen om twee afzonderlijke Athena-tabellen bij te houden: รฉรฉn voor het opslaan van gebruikersvertoningen en een andere voor het opslaan van gebruikersklikken op deze vertoningen. Met behulp van SQL-join-instructies kunnen we vertoningen ophalen waarop gebruikers hebben geklikt of niet hebben geklikt, en deze vertoningen vervolgens doorgeven aan een modeltrainingstaak.

Oplossing voordelen

Het implementeren van de voorgestelde oplossing brengt verschillende voordelen met zich mee voor onze bestaande workflow, waaronder:

  • Vereenvoudigde implementatie โ€“ Met de oplossing kan feature-extractie in Python worden geรฏmplementeerd met behulp van populaire ML-bibliotheken. En het vereist niet dat de code naar PySpark wordt geport. Dit stroomlijnt de extractie van functies, aangezien dezelfde code die door een datawetenschapper in een notebook is ontwikkeld, door deze pijplijn zal worden uitgevoerd.
  • Snel pad naar productie โ€“ De oplossing kan worden ontwikkeld en ingezet door een datawetenschapper om op schaal feature-extractie uit te voeren, waardoor hij of zij een ML-aanbevelingsmodel op basis van deze gegevens kan ontwikkelen. Tegelijkertijd kan dezelfde oplossing door een ML-ingenieur in productie worden genomen, met kleine aanpassingen.
  • herbruikbaarheid โ€“ De oplossing biedt een herbruikbaar patroon voor het op grote schaal extraheren van kenmerken, en kan eenvoudig worden aangepast voor andere gebruiksscenario's dan het bouwen van aanbevelingsmodellen.
  • Efficiรซnt โ€“ De oplossing biedt goede prestaties: verwerking van รฉรฉn dag van de Talent. com's gegevens duurden minder dan 1 uur.
  • Incrementele updates โ€“ De oplossing ondersteunt ook incrementele updates. Nieuwe dagelijkse gegevens kunnen worden verwerkt met een SageMaker Processing-taak, en de S3-locatie met de verwerkte gegevens kan opnieuw worden gecrawld om de Athena-tabel bij te werken. We kunnen ook een cronjob gebruiken om de gegevens van vandaag meerdere keren per dag bij te werken (bijvoorbeeld elke 3 uur).

We hebben deze ETL-pijplijn gebruikt om Talent.com te helpen 50,000 bestanden per dag met 5 miljoen records te verwerken, en trainingsgegevens te creรซren met behulp van functies die zijn geรซxtraheerd uit 90 dagen aan ruwe gegevens van Talent.com: een totaal van 450 miljoen records verdeeld over 900,000 bestanden. Dankzij onze pipeline kon Talent.com het aanbevelingssysteem binnen slechts twee weken bouwen en in productie nemen. De oplossing voerde alle ML-processen uit, inclusief ETL op Amazon SageMaker, zonder gebruik te maken van andere AWS-services. Het functieaanbevelingssysteem zorgde voor een stijging van 2% in de klikfrequentie bij online A/B-testen vergeleken met een eerdere op XGBoost gebaseerde oplossing, waardoor miljoenen Talent.com-gebruikers aan betere banen werden gekoppeld.

Conclusie

Dit bericht schetst de ETL-pijplijn die we hebben ontwikkeld voor functieverwerking voor training en implementatie van een functieaanbevelingsmodel bij Talent.com. Onze pijplijn maakt gebruik van SageMaker Processing-taken voor efficiรซnte gegevensverwerking en functie-extractie op grote schaal. Functie-extractiecode is geรฏmplementeerd in Python, waardoor het gebruik van populaire ML-bibliotheken mogelijk is om functie-extractie op schaal uit te voeren, zonder dat de code hoeft te worden geporteerd om PySpark te gebruiken.

We moedigen de lezers aan om de mogelijkheid te onderzoeken om de pijplijn die in deze blog wordt gepresenteerd te gebruiken als sjabloon voor hun gebruiksscenario's waarbij functie-extractie op schaal vereist is. De pijplijn kan door een datawetenschapper worden gebruikt om een โ€‹โ€‹ML-model te bouwen, en dezelfde pijplijn kan vervolgens door een ML-ingenieur worden overgenomen om in productie te draaien. Dit kan de tijd die nodig is om de ML-oplossing end-to-end te produceren aanzienlijk verkorten, zoals het geval was bij Talent.com. De lezers kunnen verwijzen naar de tutorial voor het instellen en uitvoeren van SageMaker Processing-taken. We verwijzen de lezers ook naar het bericht Van tekst naar droombaan: een NLP-gebaseerde baanaanbeveler bouwen op Talent.com met Amazon SageMaker, waar we het gebruik van deep learning-modeltrainingstechnieken bespreken Amazon Sage Maker om het functieaanbevelingssysteem van Talent.com te bouwen.


Over de auteurs

Dmitri BespalovDmitri Bespalov is een Senior Applied Scientist bij het Amazon Machine Learning Solutions Lab, waar hij AWS-klanten in verschillende sectoren helpt om hun AI- en cloudadoptie te versnellen.

Yi XiangYi Xiang is een Applied Scientist II bij het Amazon Machine Learning Solutions Lab, waar ze AWS-klanten in verschillende sectoren helpt hun AI- en cloud-adoptie te versnellen.

Tong WangTong Wang is een Senior Applied Scientist bij het Amazon Machine Learning Solutions Lab, waar hij AWS-klanten in verschillende sectoren helpt om hun AI- en cloudadoptie te versnellen.

Anatoly KhomenkoAnatoly Khomenko is een Senior Machine Learning Engineer bij Talent. com met een passie voor natuurlijke taalverwerking, waarbij goede mensen aan goede banen worden gekoppeld.

Abdenour BezzouhAbdenour Bezzouh is een leidinggevende met meer dan 25 jaar ervaring in het bouwen en leveren van technologische oplossingen die opschalen naar miljoenen klanten. Abdenour bekleedde de functie van Chief Technology Officer (CTO) bij Talent. com toen het AWS-team deze specifieke oplossing ontwierp en uitvoerde Talent. com.

Yanjun QiYanjun Qi is Senior Applied Science Manager bij het Amazon Machine Learning Solution Lab. Ze innoveert en past machine learning toe om AWS-klanten te helpen hun AI- en cloudadoptie te versnellen.

Tijdstempel:

Meer van AWS-machine learning