Rationalisation du traitement des données ETL chez Talent.com avec Amazon SageMaker | Services Web Amazon

Rationalisation du traitement des données ETL chez Talent.com avec Amazon SageMaker | Services Web Amazon

Cet article est co-écrit par Anatoly Khomenko, ingénieur en machine learning, et Abdenour Bezzouh, directeur technologique chez Talent.com.

Établi en 2011, talent.com regroupe les offres d'emploi rémunérées de leurs clients et les offres d'emploi publiques, et a créé une plate-forme unifiée et facilement consultable. Couvrant plus de 30 millions d'offres d'emploi dans plus de 75 pays et couvrant diverses langues, secteurs et canaux de distribution, Talent.com répond aux divers besoins des demandeurs d'emploi, connectant efficacement des millions de demandeurs d'emploi aux opportunités d'emploi.

La mission de Talent.com est de faciliter les connexions de main-d’œuvre mondiale. Pour y parvenir, Talent.com regroupe les offres d'emploi provenant de diverses sources sur le Web, offrant aux demandeurs d'emploi l'accès à un vaste bassin de plus de 30 millions d'opportunités d'emploi adaptées à leurs compétences et expériences. Conformément à cette mission, Talent.com a collaboré avec AWS pour développer un moteur de recommandation d'emploi de pointe basé sur l'apprentissage profond, visant à aider les utilisateurs à faire progresser leur carrière.

Pour garantir le fonctionnement efficace de ce moteur de recommandation d’emploi, il est crucial de mettre en œuvre un pipeline de traitement de données à grande échelle chargé d’extraire et d’affiner les fonctionnalités des offres d’emploi agrégées de Talent.com. Ce pipeline est capable de traiter 5 millions d'enregistrements quotidiens en moins d'une heure et permet de traiter plusieurs jours d'enregistrements en parallèle. De plus, cette solution permet un déploiement rapide en production. La principale source de données pour ce pipeline est le format JSON Lines, stocké dans Service de stockage simple Amazon (Amazon S3) et partitionné par date. Chaque jour, cela entraîne la génération de dizaines de milliers de fichiers JSON Lines, avec des mises à jour incrémentielles effectuées quotidiennement.

L'objectif premier de ce pipeline de traitement de données est de faciliter la création des fonctionnalités nécessaires à la formation et au déploiement du moteur de recommandation d'emploi sur Talent.com. Il convient de noter que ce pipeline doit prendre en charge les mises à jour incrémentielles et répondre aux exigences complexes d’extraction de fonctionnalités nécessaires aux modules de formation et de déploiement essentiels au système de recommandation d’emploi. Notre pipeline appartient à la famille générale de processus ETL (extraire, transformer et charger) qui combine des données provenant de plusieurs sources dans un vaste référentiel central.

Pour plus d'informations sur la façon dont Talent.com et AWS ont élaboré en collaboration des techniques de pointe de traitement du langage naturel et de formation de modèles d'apprentissage en profondeur, en utilisant Amazon Sage Maker pour élaborer un système de recommandation d'emploi, reportez-vous à Du texte au travail de rêve : créer un outil de recommandation d'emploi basé sur la PNL sur Talent.com avec Amazon SageMaker. Le système comprend l'ingénierie des fonctionnalités, la conception d'architecture de modèle d'apprentissage en profondeur, l'optimisation des hyperparamètres et l'évaluation du modèle, où tous les modules sont exécutés à l'aide de Python.

Cet article montre comment nous avons utilisé SageMaker pour créer un pipeline de traitement de données à grande échelle afin de préparer les fonctionnalités du moteur de recommandation d'emploi de Talent.com. La solution résultante permet à un Data Scientist d'imaginer l'extraction de fonctionnalités dans un notebook SageMaker à l'aide de bibliothèques Python, telles que Scikit-Apprendre or PyTorch, puis de déployer rapidement le même code dans le pipeline de traitement des données en effectuant une extraction de fonctionnalités à grande échelle. La solution ne nécessite pas de porter le code d'extraction de fonctionnalités pour utiliser PySpark, comme requis lors de l'utilisation Colle AWS comme solution ETL. Notre solution peut être développée et déployée de bout en bout uniquement par un Data Scientist en utilisant uniquement un SageMaker, et ne nécessite pas de connaissance d'autres solutions ETL, telles que Lot AWS. Cela peut réduire considérablement le temps nécessaire au déploiement du pipeline Machine Learning (ML) en production. Le pipeline est exploité via Python et s'intègre de manière transparente aux flux de travail d'extraction de fonctionnalités, le rendant adaptable à un large éventail d'applications d'analyse de données.

Vue d'ensemble de la solution

Présentation du pipeline ETL utilisant SageMaker Processing

Le pipeline comprend trois phases principales :

  1. Utiliser un Traitement d'Amazon SageMaker travail pour gérer les fichiers JSONL bruts associés à un jour spécifié. Plusieurs jours de données peuvent être traités simultanément par des tâches de traitement distinctes.
  2. Employer Colle AWS pour l'exploration des données après avoir traité plusieurs jours de données.
  3. Charger les fonctionnalités traitées pour une plage de dates spécifiée à l'aide de SQL à partir d'un Amazone Athéna table, puis entraînez et déployez le modèle de recommandation d'emploi.

Traiter les fichiers JSONL bruts

Nous traitons les fichiers JSONL bruts pour une journée spécifiée à l'aide d'un travail de traitement SageMaker. Le travail implémente l'extraction de fonctionnalités et le compactage des données, et enregistre les fonctionnalités traitées dans des fichiers Parquet avec 1 million d'enregistrements par fichier. Nous profitons de la parallélisation du processeur pour effectuer en parallèle l'extraction de fonctionnalités pour chaque fichier JSONL brut. Les résultats du traitement de chaque fichier JSONL sont enregistrés dans un fichier Parquet distinct dans un répertoire temporaire. Une fois tous les fichiers JSONL traités, nous effectuons le compactage de milliers de petits fichiers Parquet en plusieurs fichiers avec 1 million d'enregistrements par fichier. Les fichiers Parquet compactés sont ensuite téléchargés dans Amazon S3 en tant que résultat de la tâche de traitement. Le compactage des données garantit une exploration et des requêtes SQL efficaces dans les étapes suivantes du pipeline.

Voici l'exemple de code permettant de planifier une tâche de traitement SageMaker pour un jour spécifié, par exemple le 2020/01/01, à l'aide du SDK SageMaker. La tâche lit les fichiers JSONL bruts d'Amazon S3 (par exemple depuis s3://bucket/raw-data/2020/01/01) et enregistre les fichiers Parquet compactés dans Amazon S3 (par exemple pour s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

Le schéma de code suivant pour le script principal (processing_script.py) qui exécute la tâche SageMaker Processing est la suivante :

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

L'évolutivité est une caractéristique clé de notre pipeline. Premièrement, plusieurs tâches SageMaker Processing peuvent être utilisées pour traiter des données pendant plusieurs jours simultanément. Deuxièmement, nous évitons de charger simultanément toutes les données traitées ou brutes en mémoire, lors du traitement de chaque jour spécifié de données. Cela permet le traitement des données à l’aide de types d’instances qui ne peuvent pas accueillir une journée complète de données dans la mémoire principale. La seule exigence est que le type d'instance soit capable de charger simultanément N fichiers JSONL bruts ou Parquet traités en mémoire, N étant le nombre de processeurs de processus utilisés.

Analyser les données traitées à l'aide d'AWS Glue

Une fois que toutes les données brutes de plusieurs jours ont été traitées, nous pouvons créer une table Athena à partir de l'ensemble de données à l'aide d'un robot d'exploration AWS Glue. Nous utilisons le AWS SDK pour les pandas (awswrangler) bibliothèque pour créer la table à l'aide de l'extrait suivant :

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Charger les fonctionnalités traitées pour la formation

Les fonctionnalités traitées pour une plage de dates spécifiée peuvent désormais être chargées à partir de la table Athena à l'aide de SQL, et ces fonctionnalités peuvent ensuite être utilisées pour entraîner le modèle de recommandation de tâches. Par exemple, l'extrait suivant charge un mois de fonctionnalités traitées dans un DataFrame à l'aide de l'option awswrangler bibliothèque:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

De plus, l'utilisation de SQL pour charger les fonctionnalités traitées pour la formation peut être étendue pour s'adapter à divers autres cas d'utilisation. Par exemple, nous pouvons appliquer un pipeline similaire pour gérer deux tables Athena distinctes : une pour stocker les impressions des utilisateurs et une autre pour stocker les clics des utilisateurs sur ces impressions. À l'aide des instructions de jointure SQL, nous pouvons récupérer les impressions sur lesquelles les utilisateurs ont cliqué ou n'ont pas cliqué, puis transmettre ces impressions à une tâche de formation de modèle.

Avantages de la solution

La mise en œuvre de la solution proposée apporte plusieurs avantages à notre flux de travail existant, notamment :

  • Mise en œuvre simplifiée – La solution permet d'implémenter l'extraction de fonctionnalités en Python à l'aide de bibliothèques ML populaires. Et cela ne nécessite pas que le code soit porté dans PySpark. Cela rationalise l'extraction de fonctionnalités, car le même code développé par un Data Scientist dans un notebook sera exécuté par ce pipeline.
  • Accès rapide à la production – La solution peut être développée et déployée par un Data Scientist pour effectuer une extraction de fonctionnalités à grande échelle, lui permettant de développer un modèle de recommandation ML par rapport à ces données. Dans le même temps, la même solution peut être déployée en production par un ingénieur ML avec peu de modifications nécessaires.
  • Réutilisable – La solution fournit un modèle réutilisable pour l’extraction de fonctionnalités à grande échelle et peut être facilement adaptée à d’autres cas d’utilisation au-delà de la création de modèles de recommandation.
  • Efficacité – La solution offre de bonnes performances : traitement d’une seule journée du talent.comLes données de ont pris moins d'une heure.
  • Mises à jour incrémentielles – La solution prend également en charge les mises à jour incrémentielles. Les nouvelles données quotidiennes peuvent être traitées avec une tâche de traitement SageMaker, et l'emplacement S3 contenant les données traitées peut être réexploré pour mettre à jour la table Athena. Nous pouvons également utiliser une tâche cron pour mettre à jour les données du jour plusieurs fois par jour (par exemple toutes les 3 heures).

Nous avons utilisé ce pipeline ETL pour aider Talent.com à traiter 50,000 5 fichiers par jour contenant 90 millions d'enregistrements et à créer des données de formation à l'aide de fonctionnalités extraites de 450 jours de données brutes de Talent.com, soit un total de 900,000 millions d'enregistrements répartis sur 2 8.6 fichiers. Notre pipeline a aidé Talent.com à créer et à déployer le système de recommandation en production en seulement XNUMX semaines. La solution a exécuté tous les processus ML, y compris ETL sur Amazon SageMaker sans utiliser d'autre service AWS. Le système de recommandation d’emploi a entraîné une augmentation de XNUMX % du taux de clics dans les tests A/B en ligne par rapport à une précédente solution basée sur XGBoost, aidant ainsi à connecter des millions d’utilisateurs de Talent.com à de meilleurs emplois.

Conclusion

Cet article décrit le pipeline ETL que nous avons développé pour le traitement des fonctionnalités pour la formation et le déploiement d'un modèle de recommandation d'emploi sur Talent.com. Notre pipeline utilise les tâches SageMaker Processing pour un traitement efficace des données et une extraction de fonctionnalités à grande échelle. Le code d'extraction de fonctionnalités est implémenté en Python, permettant l'utilisation de bibliothèques ML populaires pour effectuer une extraction de fonctionnalités à grande échelle, sans avoir besoin de porter le code pour utiliser PySpark.

Nous encourageons les lecteurs à explorer la possibilité d'utiliser le pipeline présenté dans ce blog comme modèle pour leurs cas d'utilisation où l'extraction de fonctionnalités à grande échelle est requise. Le pipeline peut être exploité par un Data Scientist pour créer un modèle ML, et le même pipeline peut ensuite être adopté par un ingénieur ML pour être exécuté en production. Cela peut réduire considérablement le temps nécessaire à la production de la solution ML de bout en bout, comme ce fut le cas avec Talent.com. Les lecteurs peuvent se référer au tutoriel pour configurer et exécuter des tâches SageMaker Processing. Nous renvoyons également les lecteurs à consulter l'article Du texte au travail de rêve : créer un outil de recommandation d'emploi basé sur la PNL sur Talent.com avec Amazon SageMaker, où nous discutons des techniques de formation de modèles d'apprentissage profond utilisant Amazon Sage Maker pour construire le système de recommandation d’emploi de Talent.com.


À propos des auteurs

Dimitri BespalovDimitri Bespalov est Senior Applied Scientist au Amazon Machine Learning Solutions Lab, où il aide les clients AWS de différents secteurs à accélérer leur adoption de l'IA et du cloud.

Yi XiangYi Xiang est Applied Scientist II au Amazon Machine Learning Solutions Lab, où elle aide les clients AWS de différents secteurs à accélérer leur adoption de l'IA et du cloud.

Tong WangTong Wang est Senior Applied Scientist au Amazon Machine Learning Solutions Lab, où il aide les clients AWS de différents secteurs à accélérer leur adoption de l'IA et du cloud.

Anatoli KhomenkoAnatoli Khomenko est ingénieur senior en apprentissage automatique chez talent.com avec une passion pour le traitement du langage naturel, jumelant de bonnes personnes à de bons emplois.

Abdenour BezzouhAbdenour Bezzouh est un cadre avec plus de 25 ans d'expérience dans la création et la fourniture de solutions technologiques adaptées à des millions de clients. Abdenour a occupé le poste de Chief Technology Officer (CTO) chez talent.com lorsque l'équipe AWS a conçu et exécuté cette solution particulière pour talent.com.

Yanjun QiYanjun Qi est Senior Applied Science Manager au Amazon Machine Learning Solution Lab. Elle innove et applique l'apprentissage automatique pour aider les clients d'AWS à accélérer leur adoption de l'IA et du cloud.

Horodatage:

Plus de Apprentissage automatique AWS