Strømlining af ETL-databehandling på Talent.com med Amazon SageMaker | Amazon Web Services

Strømlining af ETL-databehandling på Talent.com med Amazon SageMaker | Amazon Web Services

Dette indlæg er medforfattet af Anatoly Khomenko, Machine Learning Engineer, og Abdenour Bezzouh, Chief Technology Officer hos Talent.com.

Etableret i 2011, talent.com samler betalte jobannoncer fra deres kunder og offentlige jobopslag og har skabt en samlet, let søgbar platform. Talent.com dækker over 30 millioner jobannoncer i mere end 75 lande og spænder over forskellige sprog, brancher og distributionskanaler, og imødekommer jobsøgendes forskellige behov og forbinder effektivt millioner af jobsøgende med jobmuligheder.

Talent.coms mission er at lette globale arbejdsstyrkeforbindelser. For at opnå dette samler Talent.com jobannoncer fra forskellige kilder på nettet, hvilket giver jobsøgende adgang til en omfattende pulje på over 30 millioner jobmuligheder skræddersyet til deres færdigheder og erfaringer. I tråd med denne mission samarbejdede Talent.com med AWS om at udvikle en banebrydende jobanbefalingsmotor drevet af dyb læring, der sigter mod at hjælpe brugere med at fremme deres karriere.

For at sikre den effektive drift af denne jobanbefalingsmotor er det afgørende at implementere en storstilet databehandlingspipeline, der er ansvarlig for at udtrække og forfine funktioner fra Talent.com's samlede jobopslag. Denne pipeline er i stand til at behandle 5 millioner daglige registreringer på mindre end 1 time og giver mulighed for at behandle flere dages registreringer parallelt. Derudover giver denne løsning mulighed for en hurtig implementering til produktion. Den primære datakilde for denne pipeline er JSON Lines-formatet, gemt i Amazon Simple Storage Service (Amazon S3) og opdelt efter dato. Hver dag resulterer dette i generering af titusindvis af JSON Lines-filer, med trinvise opdateringer, der forekommer dagligt.

Det primære formål med denne databehandlingspipeline er at lette oprettelsen af ​​funktioner, der er nødvendige for træning og implementering af jobanbefalingsmotoren på Talent.com. Det er værd at bemærke, at denne pipeline skal understøtte trinvise opdateringer og imødekomme de indviklede funktionsudtrækningskrav, der er nødvendige for de trænings- og implementeringsmoduler, der er vigtige for jobanbefalingssystemet. Vores pipeline tilhører den generelle ETL (ekstrahere, transformere og indlæse) procesfamilie, der kombinerer data fra flere kilder til et stort centralt lager.

For yderligere indsigt i, hvordan Talent.com og AWS i fællesskab byggede banebrydende naturligt sprogbehandling og deep learning modeltræningsteknikker ved at bruge Amazon SageMaker at lave et jobanbefalingssystem, se Fra tekst til drømmejob: Opbygning af en NLP-baseret jobanbefaler på Talent.com med Amazon SageMaker. Systemet omfatter feature engineering, deep learning modelarkitekturdesign, hyperparameteroptimering og modelevaluering, hvor alle moduler køres ved hjælp af Python.

Dette indlæg viser, hvordan vi brugte SageMaker til at bygge en storstilet databehandlingspipeline til at forberede funktioner til jobanbefalingsmotoren på Talent.com. Den resulterende løsning gør det muligt for en Data Scientist at idetage ekstraktion af funktioner i en SageMaker notesbog ved hjælp af Python-biblioteker, som f.eks. Scikit-Lær or PyTorch, og derefter hurtigt at implementere den samme kode i databehandlingspipelinen ved at udføre funktionsudtræk i skala. Løsningen kræver ikke portering af funktionsudtrækskoden for at bruge PySpark, som det kræves ved brug AWS Lim som ETL-løsningen. Vores løsning kan udvikles og implementeres udelukkende af en Data Scientist end-to-end ved kun at bruge en SageMaker, og kræver ikke kendskab til andre ETL-løsninger, som f.eks. AWS batch. Dette kan væsentligt forkorte den tid, der er nødvendig for at implementere Machine Learning (ML) pipeline til produktion. Pipelinen drives gennem Python og integreres problemfrit med funktionsudtrækningsarbejdsgange, hvilket gør den tilpasselig til en lang række dataanalyseapplikationer.

Løsningsoversigt

Oversigt over ETL-pipeline ved hjælp af SageMaker Processing

Rørledningen består af tre primære faser:

  1. Brug en Amazon SageMaker-behandling job til at håndtere rå JSONL-filer tilknyttet en bestemt dag. Flere dages data kan behandles af separate behandlingsjob samtidigt.
  2. Beskæftige AWS Lim til datacrawling efter behandling af flere dages data.
  3. Indlæs behandlede funktioner for et angivet datointerval ved hjælp af SQL fra en Amazonas Athena tabel, og derefter træne og implementere jobanbefalingsmodellen.

Behandle rå JSONL-filer

Vi behandler rå JSONL-filer for en bestemt dag ved hjælp af et SageMaker Processing-job. Jobbet implementerer udtræk af funktioner og datakomprimering og gemmer behandlede funktioner i Parket-filer med 1 million poster pr. fil. Vi udnytter CPU-parallelisering til at udføre funktionsekstraktion for hver rå JSONL-fil parallelt. Behandlingsresultater af hver JSONL-fil gemmes i en separat Parquet-fil i en midlertidig mappe. Efter at alle JSONL-filerne er blevet behandlet, udfører vi komprimering af tusindvis af små parketfiler til flere filer med 1 million poster pr. fil. De komprimerede Parket-filer uploades derefter til Amazon S3 som output fra forarbejdningsjobbet. Datakomprimeringen sikrer effektiv gennemgang og SQL-forespørgsler i de næste stadier af pipelinen.

Følgende er eksempelkoden til at planlægge et SageMaker Processing-job for en bestemt dag, for eksempel 2020-01-01, ved hjælp af SageMaker SDK. Jobbet læser rå JSONL-filer fra Amazon S3 (f.eks. fra s3://bucket/raw-data/2020/01/01) og gemmer de komprimerede Parket-filer i Amazon S3 (for eksempel til s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

Følgende kodeoversigt for hovedscriptet (processing_script.py), der kører SageMaker Processing-jobbet er som følger:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

Skalerbarhed er en nøglefunktion i vores pipeline. For det første kan flere SageMaker Processing-job bruges til at behandle data i flere dage samtidigt. For det andet undgår vi at indlæse hele behandlede eller rå data i hukommelsen på én gang, mens vi behandler hver specificeret dag med data. Dette muliggør behandling af data ved hjælp af instanstyper, der ikke kan rumme en hel dags data i den primære hukommelse. Det eneste krav er, at instanstypen skal være i stand til at indlæse N rå JSONL- eller behandlede Parket-filer i hukommelsen samtidigt, hvor N er antallet af procesarbejdere, der er i brug.

Gennemgå behandlede data ved hjælp af AWS Glue

Efter at alle rådata for flere dage er blevet behandlet, kan vi oprette en Athena-tabel fra hele datasættet ved at bruge en AWS Glue-crawler. Vi bruger AWS SDK til pandaer (awswrangler) bibliotek for at oprette tabellen ved hjælp af følgende uddrag:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Indlæs behandlede funktioner til træning

Bearbejdede funktioner for et specificeret datointerval kan nu indlæses fra Athena-tabellen ved hjælp af SQL, og disse funktioner kan derefter bruges til træning af jobanbefalingsmodellen. For eksempel indlæser følgende uddrag en måneds behandlede funktioner i en DataFrame ved hjælp af awswrangler bibliotek:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

Derudover kan brugen af ​​SQL til at indlæse behandlede funktioner til træning udvides til at rumme forskellige andre brugssager. For eksempel kan vi anvende en lignende pipeline til at opretholde to separate Athena-tabeller: en til lagring af brugerindtryk og en anden til lagring af brugerklik på disse visninger. Ved at bruge SQL join-sætninger kan vi hente visninger, som brugere enten klikkede på eller ikke klikkede på, og derefter videregive disse visninger til et modeloplæringsjob.

Løsningsfordele

Implementering af den foreslåede løsning giver vores eksisterende arbejdsgang flere fordele, herunder:

  • Forenklet implementering – Løsningen gør det muligt at implementere udtræk af funktioner i Python ved hjælp af populære ML-biblioteker. Og det kræver ikke, at koden porteres ind i PySpark. Dette strømliner ekstraktion af funktioner, da den samme kode udviklet af en dataforsker i en notesbog vil blive udført af denne pipeline.
  • Hurtig vej til produktion – Løsningen kan udvikles og implementeres af en dataforsker til at udføre funktionsekstraktion i stor skala, hvilket gør dem i stand til at udvikle en ML-anbefalingsmodel mod disse data. Samtidig kan den samme løsning implementeres til produktion af en ML-ingeniør med små ændringer.
  • Reus Evne – Løsningen giver et genanvendeligt mønster til funktionsudtrækning i stor skala og kan nemt tilpasses til andre anvendelsestilfælde ud over byggeanbefalingsmodeller.
  • Effektivitet – Løsningen giver god ydeevne: behandler en enkelt dag af talent.coms data tog mindre end 1 time.
  • Trinvise opdateringer – Løsningen understøtter også trinvise opdateringer. Nye daglige data kan behandles med et SageMaker Processing-job, og S3-lokationen, der indeholder de behandlede data, kan crawles igen for at opdatere Athena-tabellen. Vi kan også bruge et cron-job til at opdatere dagens data flere gange om dagen (for eksempel hver 3. time).

Vi brugte denne ETL-pipeline til at hjælpe Talent.com med at behandle 50,000 filer om dagen indeholdende 5 millioner poster, og skabte træningsdata ved hjælp af funktioner udtrukket fra 90 dages rådata fra Talent.com – i alt 450 millioner poster på tværs af 900,000 filer. Vores pipeline hjalp Talent.com med at bygge og implementere anbefalingssystemet i produktion inden for kun 2 uger. Løsningen udførte alle ML-processer inklusive ETL på Amazon SageMaker uden at bruge anden AWS-tjeneste. Jobanbefalingssystemet førte til en stigning på 8.6 % i klikfrekvensen i online A/B-tests i forhold til en tidligere XGBoost-baseret løsning, hvilket hjalp med at forbinde millioner af Talent.coms brugere til bedre job.

Konklusion

Dette indlæg skitserer den ETL-pipeline, vi udviklede til funktionsbehandling til træning og implementering af en jobanbefalingsmodel på Talent.com. Vores pipeline bruger SageMaker Processing-job til effektiv databehandling og udtræk af funktioner i stor skala. Funktionsudtrækningskode er implementeret i Python, hvilket muliggør brugen af ​​populære ML-biblioteker til at udføre funktionsudtrækning i skala uden behov for at portere koden for at bruge PySpark.

Vi opfordrer læserne til at udforske muligheden for at bruge den pipeline, der præsenteres i denne blog, som en skabelon for deres use-cases, hvor funktionsudtræk i skala er påkrævet. Pipelinen kan udnyttes af en dataforsker til at bygge en ML-model, og den samme pipeline kan derefter adopteres af en ML-ingeniør til at køre i produktion. Dette kan betydeligt reducere den tid, der kræves til at producere ML-løsningen fra ende til ende, som det var tilfældet med Talent.com. Læserne kan henvise til tutorial til opsætning og kørsel af SageMaker Processing-job. Vi henviser også læserne til at se opslaget Fra tekst til drømmejob: Opbygning af en NLP-baseret jobanbefaler på Talent.com med Amazon SageMaker, hvor vi diskuterer deep learning model træningsteknikker ved at bruge Amazon SageMaker at bygge Talent.coms jobanbefalingssystem.


Om forfatterne

Dmitriy BespalovDmitriy Bespalov er Senior Applied Scientist ved Amazon Machine Learning Solutions Lab, hvor han hjælper AWS-kunder på tværs af forskellige industrier med at accelerere deres AI og cloud-adoption.

Yi XiangYi Xiang er Applied Scientist II ved Amazon Machine Learning Solutions Lab, hvor hun hjælper AWS-kunder på tværs af forskellige industrier med at accelerere deres AI og cloud-adoption.

Tong WangTong Wang er Senior Applied Scientist ved Amazon Machine Learning Solutions Lab, hvor han hjælper AWS-kunder på tværs af forskellige industrier med at accelerere deres AI og cloud-adoption.

Anatoly KhomenkoAnatoly Khomenko er Senior Machine Learning Engineer hos talent.com med en passion for naturlig sprogbehandling, der matcher gode mennesker til gode jobs.

Abdenour BezzouhAbdenour Bezzouh er en leder med mere end 25 års erfaring med at bygge og levere teknologiske løsninger, der skaleres til millioner af kunder. Abdenour havde stillingen som Chief Technology Officer (CTO) kl talent.com da AWS-teamet designede og udførte netop denne løsning til talent.com.

Yanjun QiYanjun Qi er Senior Applied Science Manager hos Amazon Machine Learning Solution Lab. Hun innoverer og anvender maskinlæring for at hjælpe AWS-kunder med at fremskynde deres AI og cloud-adoption.

Tidsstempel:

Mere fra AWS maskinindlæring