Amazon SageMaker を使用して Talent.com での ETL データ処理を合理化する | アマゾン ウェブ サービス

Amazon SageMaker を使用して Talent.com での ETL データ処理を合理化する | アマゾン ウェブ サービス

この投稿は、機械学習エンジニアの Anatoly Khomenko と Talent.com の最高技術責任者 Abdenour Bezzouh の共著です。

2011に設立され、 タレント.com は、クライアントからの有料求人情報と公開求人情報を集約し、簡単に検索できる統合プラットフォームを作成しました。 Talent.com は、30 か国以上、さまざまな言語、業界、流通チャネルにわたる 75 万件以上の求人情報をカバーし、求職者の多様なニーズに応え、何百万もの求職者と仕事の機会を効果的に結びつけます。

Talent.com の使命は、グローバルな従業員のつながりを促進することです。これを達成するために、Talent.com はウェブ上のさまざまなソースから求人情報を集約し、求職者が自分のスキルや経験に合わせた 30 万件を超える幅広い求人情報にアクセスできるようにしています。この使命に沿って、Talent.com は AWS と協力して、ユーザーのキャリアアップを支援することを目的として、ディープラーニングを活用した最先端の求人推奨エンジンを開発しました。

この求人推薦エンジンの効果的な運用を確保するには、Talent.com の集約された求人情報から機能を抽出して絞り込む役割を担う大規模なデータ処理パイプラインを実装することが重要です。このパイプラインは、毎日 5 万件のレコードを 1 時間以内に処理でき、複数日分のレコードを並行して処理できます。さらに、このソリューションにより、運用環境への迅速な導入が可能になります。このパイプラインの主なデータ ソースは、次の場所に保存されている JSON Lines 形式です。 Amazon シンプル ストレージ サービス (Amazon S3) であり、日付ごとにパーティション化されています。これにより、毎日数万の JSON Lines ファイルが生成され、増分更新が毎日行われます。

このデータ処理パイプラインの主な目的は、Talent.com での求人推薦エンジンのトレーニングと展開に必要な機能の作成を容易にすることです。このパイプラインは増分更新をサポートし、ジョブ推奨システムに不可欠なトレーニングおよび導入モジュールに必要な複雑な機能抽出要件に対応する必要があることに注意してください。私たちのパイプラインは、複数のソースからのデータを大規模な中央リポジトリに結合する一般的な ETL (抽出、変換、ロード) プロセス ファミリに属しています。

Talent.com と AWS がどのように協力して、最先端の自然言語処理と深層学習モデルのトレーニング技術を構築したかについてのさらなる洞察が必要です。 アマゾンセージメーカー 求人推薦システムを作成するには、を参照してください。 テキストから夢の仕事へ: Amazon SageMaker を使用して Talent.com で NLP ベースの仕事レコメンダーを構築する。このシステムには、特徴量エンジニアリング、深層学習モデル アーキテクチャ設計、ハイパーパラメータの最適化、モデル評価が含まれており、すべてのモジュールは Python を使用して実行されます。

この投稿では、SageMaker を使用して、Talent.com の求人推薦エンジンの機能を準備するための大規模なデータ処理パイプラインを構築する方法を説明します。結果として得られるソリューションにより、データ サイエンティストは、次のような Python ライブラリを使用して SageMaker ノートブックでの特徴抽出を考案できるようになります。 scikit-Learn or パイトーチ次に、同じコードをデータ処理パイプラインに迅速にデプロイして、大規模な特徴抽出を実行します。このソリューションでは、PySpark を使用する場合に必要となる、特徴抽出コードを移植して PySpark を使用する必要はありません。 AWSグルー ETL ソリューションとして。当社のソリューションは、SageMaker のみを使用してデータ サイエンティストのみがエンドツーエンドで開発およびデプロイでき、次のような他の ETL ソリューションの知識は必要ありません。 AWSバッチ。これにより、機械学習 (ML) パイプラインを実稼働環境にデプロイするのに必要な時間を大幅に短縮できます。このパイプラインは Python を通じて操作され、特徴抽出ワークフローとシームレスに統合され、幅広いデータ分析アプリケーションに適応できるようになります。

ソリューションの概要

SageMaker 処理を使用した ETL パイプラインの概要

パイプラインは 3 つの主要なフェーズで構成されます。

  1. を利用する Amazon SageMaker処理 指定された日に関連付けられた生の JSONL ファイルを処理するジョブ。複数日分のデータを、別々の処理ジョブで同時に処理できます。
  2. 雇用する AWSグルー 数日間のデータを処理した後のデータ クロール用。
  3. SQL を使用して、指定された日付範囲の処理されたフィーチャを アマゾンアテナ テーブルを作成し、ジョブ レコメンダー モデルをトレーニングしてデプロイします。

生の JSONL ファイルを処理する

SageMaker 処理ジョブを使用して、指定された日の生の JSONL ファイルを処理します。このジョブは特徴抽出とデータ圧縮を実装し、処理された特徴をファイルあたり 1 万レコードの Parquet ファイルに保存します。 CPU 並列化を利用して、生の JSONL ファイルごとに特徴抽出を並行して実行します。各 JSONL ファイルの処理結果は、一時ディレクトリ内の別の Parquet ファイルに保存されます。すべての JSONL ファイルが処理された後、数千の小さな Parquet ファイルを、ファイルあたり 1 万レコードを持つ複数のファイルに圧縮します。圧縮された Parquet ファイルは、処理ジョブの出力として Amazon S3 にアップロードされます。データ圧縮により、パイプラインの次の段階で効率的なクロールと SQL クエリが保証されます。

以下は、SageMaker SDK を使用して、指定した日 (2020-01-01 など) に SageMaker 処理ジョブをスケジュールするサンプル コードです。ジョブは、Amazon S3 (たとえば、 s3://bucket/raw-data/2020/01/01) 圧縮された Parquet ファイルを Amazon S3 に保存します (たとえば、 s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

次のメイン スクリプトのコード概要 (processing_script.pySageMaker Processing ジョブを実行する ) は次のとおりです。

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

スケーラビリティは私たちのパイプラインの重要な機能です。まず、複数の SageMaker 処理ジョブを使用して、数日間のデータを同時に処理できます。 2 番目に、指定した日ごとのデータを処理する際に、処理済みデータ全体または生データを一度にメモリにロードすることを回避します。これにより、プライマリ メモリに丸 1 日分のデータを収容できないインスタンス タイプを使用したデータの処理が可能になります。唯一の要件は、インスタンス タイプが N 個の生の JSONL ファイルまたは処理された Parquet ファイルを同時にメモリにロードできる必要があることです (N は使用中のプロセス ワーカーの数です)。

AWS Glue を使用して処理済みデータをクロールする

数日間のすべての生データが処理された後、AWS Glue クローラーを使用してデータセット全体から Athena テーブルを作成できます。私たちが使用するのは、 パンダ用 AWS SDK (awswrangler) ライブラリを使用して、次のスニペットを使用してテーブルを作成します。

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

トレーニング用に処理された特徴をロードする

指定した日付範囲の処理されたフィーチャを SQL を使用して Athena テーブルからロードできるようになり、これらのフィーチャをジョブ レコメンダー モデルのトレーニングに使用できるようになりました。たとえば、次のスニペットは、 awswrangler としょうかん:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

さらに、トレーニング用に処理された特徴をロードするための SQL の使用は、他のさまざまなユースケースに対応するように拡張できます。たとえば、同様のパイプラインを適用して 2 つの別個の Athena テーブルを管理できます。1 つはユーザーのインプレッションを保存するためのもので、もう 1 つはこれらのインプレッションに対するユーザーのクリックを保存するためのものです。 SQL 結合ステートメントを使用すると、ユーザーがクリックしたかクリックしなかったインプレッションを取得し、これらのインプレッションをモデル トレーニング ジョブに渡すことができます。

ソリューションのメリット

提案されたソリューションを実装すると、既存のワークフローに次のようないくつかの利点がもたらされます。

  • 簡素化された実装 – このソリューションにより、一般的な ML ライブラリを使用して Python で特徴抽出を実装できるようになります。また、コードを PySpark に移植する必要もありません。これにより、データ サイエンティストがノートブックで開発したのと同じコードがこのパイプラインで実行されるため、特徴抽出が効率化されます。
  • 本番環境への迅速なパス – このソリューションはデータ サイエンティストによって開発および展開され、大規模な特徴抽出を実行して、このデータに対して ML レコメンダー モデルを開発できるようになります。同時に、ML エンジニアは、ほとんど変更を加えることなく、同じソリューションを実稼働環境にデプロイできます。
  • 再利用性 – このソリューションは、大規模な特徴抽出のための再利用可能なパターンを提供し、レコメンダー モデルの構築以外の他のユースケースにも簡単に適応できます。
  • 効率化 – このソリューションは優れたパフォーマンスを提供し、1 日分のデータを処理します。 タレント.comさんのデータには 1 時間もかかりませんでした。
  • 増分更新 – このソリューションは増分更新もサポートしています。新しい日次データは SageMaker 処理ジョブで処理でき、処理されたデータを含む S3 の場所を再クロールして Athena テーブルを更新できます。 cron ジョブを使用して、3 日に数回 (たとえば、XNUMX 時間ごと) 今日のデータを更新することもできます。

この ETL パイプラインを使用して、Talent.com が 50,000 万レコードを含む 5 日あたり 90 個のファイルを処理できるようにし、Talent.com からの 450 日間の生データ (900,000 ファイルにわたる合計 2 億 8.6 万レコード) から抽出した特徴を使用してトレーニング データを作成しました。私たちのパイプラインにより、Talent.com はわずか XNUMX 週間でレコメンデーション システムを構築し、運用環境に導入することができました。このソリューションは、他の AWS サービスを利用せずに、ETL を含むすべての ML プロセスを Amazon SageMaker 上で実行しました。この求人推薦システムは、以前の XGBoost ベースのソリューションと比較して、オンライン A/B テストのクリックスルー率を XNUMX% 向上させ、数百万の Talent.com ユーザーをより良い求人に結びつけるのに役立ちました。

まとめ

この投稿では、Talent.com でジョブ レコメンダー モデルをトレーニングおよびデプロイするための機能処理のために開発した ETL パイプラインの概要を説明します。私たちのパイプラインは、大規模な効率的なデータ処理と特徴抽出のために SageMaker 処理ジョブを使用します。特徴抽出コードは Python で実装されているため、PySpark を使用するようにコードを移植することなく、一般的な ML ライブラリを使用して大規模な特徴抽出を実行できます。

読者の皆様には、大規模な機能抽出が必要なユースケースのテンプレートとして、このブログで紹介されているパイプラインを使用する可能性を検討することをお勧めします。データ サイエンティストはパイプラインを利用して ML モデルを構築でき、同じパイプラインを ML エンジニアが採用して本番環境で実行できます。これにより、Talent.com の場合と同様に、ML ソリューションをエンドツーエンドで製品化するのに必要な時間を大幅に短縮できます。読者は以下を参照できます。 SageMaker 処理ジョブのセットアップと実行に関するチュートリアル。読者にも投稿を閲覧するよう紹介します テキストから夢の仕事へ: Amazon SageMaker を使用して Talent.com で NLP ベースの仕事レコメンダーを構築する、ここでは、を活用した深層学習モデルのトレーニング手法について説明します。 アマゾンセージメーカー Talent.com の求人推薦システムを構築するため。


著者について

ドミトリー・ベスパロフドミトリー・ベスパロフ Amazon Machine Learning Solutions Lab の上級応用科学者であり、さまざまな業界の AWS のお客様が AI とクラウドの採用を加速するのを支援しています。

李翔李翔 Amazon Machine Learning Solutions Lab の応用科学者 II として、さまざまな業界の AWS の顧客が AI とクラウドの導入を加速できるよう支援しています。

ワン・トンワン・トン Amazon Machine Learning Solutions Lab の上級応用科学者であり、さまざまな業界の AWS のお客様が AI とクラウドの採用を加速するのを支援しています。

アナトリー・ホメンコアナトリー・ホメンコ のシニア機械学習エンジニアです。 タレント.com 良い人材と良い仕事をマッチングする自然言語処理に情熱を持っています。

アブデノール・ベズーアブデノール・ベズー は、数百万の顧客に規模を拡大するテクノロジー ソリューションを構築および提供する 25 年以上の経験を持つ幹部です。 アブデノール氏は、最高技術責任者 (CTO) の役職を歴任しました。 タレント.com AWS チームがこの特定のソリューションを設計および実行したとき タレント.com.

ヤンジュン・チーヤンジュン・チー Amazon Machine Learning Solution Lab の上級応用科学マネージャーです。 彼女は機械学習を革新して適用し、AWS のお客様が AI とクラウドの採用を加速できるように支援しています。

タイムスタンプ:

より多くの AWS機械学習