Optimieren Sie die ETL-Datenverarbeitung bei Talent.com mit Amazon SageMaker | Amazon Web Services

Optimieren Sie die ETL-Datenverarbeitung bei Talent.com mit Amazon SageMaker | Amazon Web Services

Dieser Beitrag wurde gemeinsam von Anatoly Khomenko, Ingenieur für maschinelles Lernen, und Abdenour Bezzouh, Chief Technology Officer bei Talent.com, verfasst.

Gegründet in 2011, talent.com fasst bezahlte Stellenangebote seiner Kunden und öffentliche Stellenangebote zusammen und hat eine einheitliche, leicht durchsuchbare Plattform geschaffen. Mit über 30 Millionen Stellenangeboten in mehr als 75 Ländern und in verschiedenen Sprachen, Branchen und Vertriebskanälen geht Talent.com auf die unterschiedlichen Bedürfnisse von Arbeitssuchenden ein und vermittelt Millionen von Arbeitssuchenden effektiv Stellenangebote.

Die Mission von Talent.com besteht darin, globale Arbeitskräfteverbindungen zu ermöglichen. Um dies zu erreichen, fasst Talent.com Stellenangebote aus verschiedenen Quellen im Internet zusammen und bietet Arbeitssuchenden Zugang zu einem umfangreichen Pool von über 30 Millionen Stellenangeboten, die auf ihre Fähigkeiten und Erfahrungen zugeschnitten sind. Im Einklang mit dieser Mission hat Talent.com mit AWS zusammengearbeitet, um eine hochmoderne Jobempfehlungsmaschine zu entwickeln, die auf Deep Learning basiert und Benutzer bei der Weiterentwicklung ihrer Karriere unterstützen soll.

Um den effektiven Betrieb dieser Stellenempfehlungsmaschine sicherzustellen, ist es von entscheidender Bedeutung, eine umfangreiche Datenverarbeitungspipeline zu implementieren, die für die Extraktion und Verfeinerung von Funktionen aus den aggregierten Stellenangeboten von Talent.com verantwortlich ist. Diese Pipeline ist in der Lage, 5 Millionen tägliche Datensätze in weniger als einer Stunde zu verarbeiten und ermöglicht die parallele Verarbeitung von Datensätzen über mehrere Tage. Darüber hinaus ermöglicht diese Lösung eine schnelle Bereitstellung in der Produktion. Die primäre Datenquelle für diese Pipeline ist das JSON-Lines-Format, gespeichert in Amazon Simple Storage-Service (Amazon S3) und nach Datum partitioniert. Dies führt täglich zur Generierung von Zehntausenden JSON-Lines-Dateien, wobei täglich inkrementelle Aktualisierungen erfolgen.

Das Hauptziel dieser Datenverarbeitungspipeline besteht darin, die Erstellung von Funktionen zu erleichtern, die für die Schulung und Bereitstellung der Jobempfehlungsmaschine auf Talent.com erforderlich sind. Es ist erwähnenswert, dass diese Pipeline inkrementelle Aktualisierungen unterstützen und die komplexen Anforderungen an die Merkmalsextraktion erfüllen muss, die für die Schulungs- und Bereitstellungsmodule erforderlich sind, die für das Stellenempfehlungssystem unerlässlich sind. Unsere Pipeline gehört zur allgemeinen ETL-Prozessfamilie (Extrahieren, Transformieren und Laden), die Daten aus mehreren Quellen in einem großen, zentralen Repository zusammenführt.

Für weitere Einblicke, wie Talent.com und AWS gemeinsam modernste Techniken für die Verarbeitung natürlicher Sprache und Deep-Learning-Modelltrainings entwickelt haben Amazon Sage Maker Informationen zum Erstellen eines Jobempfehlungssystems finden Sie unter Vom Text zum Traumjob: Aufbau eines NLP-basierten Job-Empfehlers bei Talent.com mit Amazon SageMaker. Das System umfasst Feature Engineering, Deep-Learning-Modellarchitekturdesign, Hyperparameteroptimierung und Modellevaluierung, wobei alle Module mit Python ausgeführt werden.

Dieser Beitrag zeigt, wie wir mit SageMaker eine umfangreiche Datenverarbeitungspipeline zur Vorbereitung von Funktionen für die Jobempfehlungs-Engine bei Talent.com aufgebaut haben. Die resultierende Lösung ermöglicht es einem Datenwissenschaftler, die Feature-Extraktion in einem SageMaker-Notebook mithilfe von Python-Bibliotheken zu entwickeln, z Scikit-Lernen or PyTorchund dann den gleichen Code schnell in der Datenverarbeitungspipeline bereitzustellen und die Merkmalsextraktion in großem Maßstab durchzuführen. Die Lösung erfordert keine Portierung des Funktionsextraktionscodes für die Verwendung von PySpark, wie dies bei der Verwendung erforderlich ist AWS-Kleber als ETL-Lösung. Unsere Lösung kann ausschließlich von einem Data Scientist durchgängig mit nur einem SageMaker entwickelt und bereitgestellt werden und erfordert keine Kenntnisse anderer ETL-Lösungen, wie z AWS-Charge. Dies kann die Zeit, die für die Bereitstellung der Pipeline für maschinelles Lernen (ML) in der Produktion erforderlich ist, erheblich verkürzen. Die Pipeline wird über Python betrieben und lässt sich nahtlos in Workflows zur Merkmalsextraktion integrieren, wodurch sie an eine Vielzahl von Datenanalyseanwendungen anpassbar ist.

Lösungsüberblick

Übersicht über die ETL-Pipeline mit SageMaker Processing

Die Pipeline besteht aus drei Hauptphasen:

  1. Verwenden Sie ein Amazon SageMaker-Verarbeitung Job zur Verarbeitung von unformatierten JSONL-Dateien, die einem bestimmten Tag zugeordnet sind. Daten mehrerer Tage können gleichzeitig von separaten Verarbeitungsjobs verarbeitet werden.
  2. Verwenden AWS-Kleber für das Daten-Crawling nach der Verarbeitung mehrerer Tage an Daten.
  3. Laden Sie verarbeitete Features für einen angegebenen Datumsbereich mithilfe von SQL aus einem Amazonas Athena Erstellen Sie eine Tabelle, trainieren Sie dann das Job-Recommendator-Modell und stellen Sie es bereit.

Rohe JSONL-Dateien verarbeiten

Wir verarbeiten JSONL-Rohdateien für einen bestimmten Tag mithilfe eines SageMaker-Verarbeitungsauftrags. Der Job implementiert die Feature-Extraktion und Datenkomprimierung und speichert verarbeitete Features in Parquet-Dateien mit 1 Million Datensätzen pro Datei. Wir nutzen die CPU-Parallelisierung, um die Feature-Extraktion für jede JSONL-Rohdatei parallel durchzuführen. Die Verarbeitungsergebnisse jeder JSONL-Datei werden in einer separaten Parquet-Datei in einem temporären Verzeichnis gespeichert. Nachdem alle JSONL-Dateien verarbeitet wurden, führen wir eine Komprimierung Tausender kleiner Parquet-Dateien in mehrere Dateien mit 1 Million Datensätzen pro Datei durch. Die komprimierten Parquet-Dateien werden dann als Ausgabe des Verarbeitungsauftrags in Amazon S3 hochgeladen. Die Datenkomprimierung sorgt für effizientes Crawling und SQL-Abfragen in den nächsten Phasen der Pipeline.

Im Folgenden finden Sie den Beispielcode zum Planen eines SageMaker-Verarbeitungsauftrags für einen bestimmten Tag, beispielsweise den 2020, mithilfe des SageMaker SDK. Der Job liest rohe JSONL-Dateien von Amazon S01 (z. B. von s3://bucket/raw-data/2020/01/01) und speichert die komprimierten Parquet-Dateien in Amazon S3 (z. B. in s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

Die folgende Codeübersicht für das Hauptskript (processing_script.py), der den SageMaker-Verarbeitungsjob ausführt, lautet wie folgt:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

Skalierbarkeit ist ein Schlüsselmerkmal unserer Pipeline. Erstens können mehrere SageMaker-Verarbeitungsjobs verwendet werden, um Daten für mehrere Tage gleichzeitig zu verarbeiten. Zweitens vermeiden wir, die gesamten verarbeiteten oder Rohdaten auf einmal in den Speicher zu laden, während wir jeden angegebenen Tag Daten verarbeiten. Dies ermöglicht die Verarbeitung von Daten mithilfe von Instanztypen, die nicht die Datenmenge eines ganzen Tages im Primärspeicher aufnehmen können. Die einzige Anforderung besteht darin, dass der Instanztyp in der Lage sein sollte, N unformatierte JSONL- oder verarbeitete Parquet-Dateien gleichzeitig in den Speicher zu laden, wobei N die Anzahl der verwendeten Prozessarbeiter ist.

Crawlen Sie verarbeitete Daten mit AWS Glue

Nachdem alle Rohdaten für mehrere Tage verarbeitet wurden, können wir mithilfe eines AWS Glue-Crawlers eine Athena-Tabelle aus dem gesamten Datensatz erstellen. Wir benutzen das AWS SDK für Pandas (awswrangler) Bibliothek, um die Tabelle mit dem folgenden Snippet zu erstellen:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Laden Sie verarbeitete Features für das Training

Verarbeitete Features für einen bestimmten Datumsbereich können jetzt mithilfe von SQL aus der Athena-Tabelle geladen werden und diese Features können dann zum Trainieren des Job-Recommendator-Modells verwendet werden. Das folgende Snippet lädt beispielsweise einen Monat verarbeiteter Features mithilfe von in einen DataFrame awswrangler Bibliothek:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

Darüber hinaus kann die Verwendung von SQL zum Laden verarbeiteter Funktionen für das Training erweitert werden, um verschiedene andere Anwendungsfälle abzudecken. Beispielsweise können wir eine ähnliche Pipeline anwenden, um zwei separate Athena-Tabellen zu verwalten: eine zum Speichern von Benutzerimpressionen und eine andere zum Speichern von Benutzerklicks auf diese Impressionen. Mithilfe von SQL-Join-Anweisungen können wir Impressionen abrufen, auf die Benutzer entweder geklickt oder nicht geklickt haben, und diese Impressionen dann an einen Modelltrainingsjob übergeben.

Lösungsvorteile

Die Implementierung der vorgeschlagenen Lösung bringt mehrere Vorteile für unseren bestehenden Workflow mit sich, darunter:

  • Vereinfachte Implementierung – Die Lösung ermöglicht die Implementierung der Feature-Extraktion in Python mithilfe beliebter ML-Bibliotheken. Und es ist nicht erforderlich, dass der Code in PySpark portiert wird. Dies optimiert die Feature-Extraktion, da derselbe Code, der von einem Datenwissenschaftler in einem Notebook entwickelt wurde, von dieser Pipeline ausgeführt wird.
  • Schneller Weg zur Produktion – Die Lösung kann von einem Datenwissenschaftler entwickelt und bereitgestellt werden, um eine Merkmalsextraktion in großem Maßstab durchzuführen und es ihm zu ermöglichen, ein ML-Empfehlungsmodell für diese Daten zu entwickeln. Gleichzeitig kann dieselbe Lösung von einem ML-Ingenieur mit nur wenigen erforderlichen Änderungen in der Produktion bereitgestellt werden.
  • Wiederverwendbarkeit – Die Lösung bietet ein wiederverwendbares Muster für die Merkmalsextraktion im großen Maßstab und kann problemlos für andere Anwendungsfälle angepasst werden, die über die Erstellung von Empfehlungsmodellen hinausgehen.
  • Effizienz – Die Lösung bietet eine gute Leistung: Verarbeitung eines einzigen Tages der talent.comDie Datenerfassung dauerte weniger als eine Stunde.
  • Inkrementelle Updates – Die Lösung unterstützt auch inkrementelle Updates. Neue tägliche Daten können mit einem SageMaker-Verarbeitungsauftrag verarbeitet werden, und der S3-Speicherort, der die verarbeiteten Daten enthält, kann erneut gecrawlt werden, um die Athena-Tabelle zu aktualisieren. Wir können auch einen Cronjob verwenden, um die heutigen Daten mehrmals täglich (z. B. alle 3 Stunden) zu aktualisieren.

Wir nutzten diese ETL-Pipeline, um Talent.com bei der Verarbeitung von 50,000 Dateien pro Tag mit 5 Millionen Datensätzen zu unterstützen, und erstellten Trainingsdaten mithilfe von Funktionen, die aus 90 Tagen Rohdaten von Talent.com extrahiert wurden – insgesamt 450 Millionen Datensätze in 900,000 Dateien. Unsere Pipeline half Talent.com dabei, das Empfehlungssystem innerhalb von nur zwei Wochen aufzubauen und in der Produktion bereitzustellen. Die Lösung führte alle ML-Prozesse einschließlich ETL auf Amazon SageMaker aus, ohne andere AWS-Dienste zu nutzen. Das Jobempfehlungssystem führte zu einer Steigerung der Klickrate bei Online-A/B-Tests um 2 % im Vergleich zu einer früheren XGBoost-basierten Lösung und trug dazu bei, Millionen von Talent.com-Nutzern bessere Jobs zu vermitteln.

Zusammenfassung

In diesem Beitrag wird die ETL-Pipeline beschrieben, die wir für die Funktionsverarbeitung zum Trainieren und Bereitstellen eines Job-Empfehlungsmodells bei Talent.com entwickelt haben. Unsere Pipeline nutzt SageMaker-Verarbeitungsjobs für eine effiziente Datenverarbeitung und Merkmalsextraktion in großem Maßstab. Der Feature-Extraktionscode ist in Python implementiert und ermöglicht die Verwendung gängiger ML-Bibliotheken zur Durchführung der Feature-Extraktion in großem Maßstab, ohne dass der Code für die Verwendung von PySpark portiert werden muss.

Wir ermutigen die Leser, die Möglichkeit zu erkunden, die in diesem Blog vorgestellte Pipeline als Vorlage für ihre Anwendungsfälle zu verwenden, bei denen eine Funktionsextraktion im großen Maßstab erforderlich ist. Die Pipeline kann von einem Datenwissenschaftler genutzt werden, um ein ML-Modell zu erstellen, und dieselbe Pipeline kann dann von einem ML-Ingenieur zur Ausführung in der Produktion übernommen werden. Dies kann die Zeit, die für die End-to-End-Produktion der ML-Lösung benötigt wird, erheblich verkürzen, wie es bei Talent.com der Fall war. Die Leser können sich auf die beziehen Tutorial zum Einrichten und Ausführen von SageMaker-Verarbeitungsjobs. Wir verweisen die Leser auch darauf, den Beitrag anzusehen Vom Text zum Traumjob: Aufbau eines NLP-basierten Job-Empfehlers bei Talent.com mit Amazon SageMaker, wo wir die Verwendung von Deep-Learning-Modelltrainingstechniken besprechen Amazon Sage Maker Aufbau des Jobempfehlungssystems von Talent.com.


Über die Autoren

Dmitrij BespalowDmitrij Bespalow ist Senior Applied Scientist im Amazon Machine Learning Solutions Lab, wo er AWS-Kunden aus verschiedenen Branchen hilft, ihre KI- und Cloud-Einführung zu beschleunigen.

Yi XiangYi Xiang ist Applied Scientist II im Amazon Machine Learning Solutions Lab, wo sie AWS-Kunden aus verschiedenen Branchen dabei hilft, ihre KI- und Cloud-Einführung zu beschleunigen.

Tong WangTong Wang ist Senior Applied Scientist im Amazon Machine Learning Solutions Lab, wo er AWS-Kunden aus verschiedenen Branchen hilft, ihre KI- und Cloud-Einführung zu beschleunigen.

Anatoli KhomenkoAnatoli Khomenko ist Senior Machine Learning Engineer bei talent.com mit einer Leidenschaft für die Verarbeitung natürlicher Sprache, um gute Leute für gute Jobs zu finden.

Abdenour BezzouhAbdenour Bezzouh ist eine Führungskraft mit mehr als 25 Jahren Erfahrung in der Entwicklung und Bereitstellung von Technologielösungen, die für Millionen von Kunden skalierbar sind. Abdenour war Chief Technology Officer (CTO) bei talent.com als das AWS-Team diese spezielle Lösung für entworfen und ausgeführt hat talent.com.

Yanjun QiYanjun Qi ist Senior Applied Science Manager im Amazon Machine Learning Solution Lab. Sie ist innovativ und wendet maschinelles Lernen an, um AWS-Kunden dabei zu helfen, ihre KI- und Cloud-Einführung zu beschleunigen.

Zeitstempel:

Mehr von AWS Maschinelles Lernen