使用 Amazon SageMaker 简化 Talent.com 的 ETL 数据处理 | 亚马逊网络服务

使用 Amazon SageMaker 简化 Talent.com 的 ETL 数据处理 | 亚马逊网络服务

本文由机器学习工程师 Anatoly Khomenko 和 Talent.com 首席技术官 Abdenour Bezzouh 共同撰写。

成立于2011年 人才网 聚合来自客户的付费职位列表和公共职位列表,并创建了一个统一的、易于搜索的平台。 Talent.com覆盖超过30个国家、超过75万条招聘信息,跨越多种语言、多种行业、多种分销渠道,满足求职者的多样化需求,有效地将数百万求职者与工作机会联系起来。

Talent.com 的使命是促进全球劳动力联系。为了实现这一目标,Talent.com 汇总了网络上各种来源的职位列表,为求职者提供了超过 30 万个根据其技能和经验量身定制的广泛就业机会。秉承这一使命,Talent.com 与 A​​WS 合作开发了由深度学习驱动的尖端职位推荐引擎,旨在帮助用户提升职业生涯。

为了确保这个职位推荐引擎的有效运行,至关重要的是实现一个大规模的数据处理管道,负责从 Talent.com 的聚合职位列表中提取和提炼特征。该管道能够在不到 5 小时的时间内处理 1 万条每日记录,并允许并行处理多天的记录。此外,该解决方案还可以快速部署到生产环境。该管道的主要数据源是 JSON Lines 格式,存储在 亚马逊简单存储服务 (Amazon S3) 并按日期分区。每天,这都会生成数以万计的 JSON Lines 文件,并且每天都会进行增量更新。

该数据处理管道的主要目标是促进创建在 Talent.com 上培训和部署职位推荐引擎所需的功能。值得注意的是,该管道必须支持增量更新,并满足职位推荐系统所必需的培训和部署模块所需的复杂特征提取要求。我们的管道属于通用 ETL(提取、转换和加载)流程系列,它将来自多个源的数据组合到一个大型中央存储库中。

如需进一步了解 Talent.com 和 AWS 如何协作构建尖端的自然语言处理和深度学习模型训练技术,请利用 亚马逊SageMaker 要制作工作推荐系统,请参阅 从文本到梦想工作:使用 Amazon SageMaker 在 Talent.com 上构建基于 NLP 的职位推荐系统。该系统包括特征工程、深度学习模型架构设计、超参数优化和模型评估,所有模块均使用Python运行。

这篇文章展示了我们如何使用 SageMaker 构建大规模数据处理管道,为 Talent.com 的职位推荐引擎准备功能。由此产生的解决方案使数据科学家能够使用 Python 库在 SageMaker 笔记本中构思特征提取,例如 Scikit学习 or PyTorch,然后将相同的代码快速部署到数据处理管道中,大规模执行特征提取。该方案不需要移植特征提取代码来使用PySpark,使用时需要 AWS胶水 作为 ETL 解决方案。我们的解决方案可以由数据科学家仅使用 SageMaker 进行端到端开发和部署,并且不需要其他 ETL 解决方案的知识,例如 AWS批处理。这可以显着缩短将机器学习 (ML) 管道部署到生产所需的时间。该管道通过 Python 进行操作,并与特征提取工作流程无缝集成,使其适用于各种数据分析应用程序。

解决方案概述

使用 SageMaker Process 的 ETL 管道概述

该管道由三个主要阶段组成:

  1. 利用一个 Amazon SageMaker处理 处理与指定日期关联的原始 JSONL 文件的作业。多个天的数据可以同时由单独的处理作业处理。
  2. 采用 AWS胶水 用于处理多天数据后的数据爬取。
  3. 使用 SQL 从指定日期范围加载已处理的要素 亚马逊雅典娜 表,然后训练和部署工作推荐模型。

处理原始 JSONL 文件

我们使用 SageMaker 处理作业处理指定日期的原始 JSONL 文件。该作业实现特征提取和数据压缩,并将处理后的特征保存到Parquet文件中,每个文件有1万条记录。我们利用 CPU 并行化来并行执行每个原始 JSONL 文件的特征提取。每个 JSONL 文件的处理结果都保存到临时目录内的单独 Parquet 文件中。处理完所有 JSONL 文件后,我们将数千个小 Parquet 文件压缩为多个文件,每个文件包含 1 万条记录。然后,压缩后的 Parquet 文件将作为处理作业的输出上传到 Amazon S3。数据压缩确保管道下一阶段的高效爬行和 SQL 查询。

以下是使用 SageMaker SDK 在指定日期(例如 2020 年 01 月 01 日)安排 SageMaker 处理作业的示例代码。该作业从 Amazon S3 读取原始 JSONL 文件(例如从 s3://bucket/raw-data/2020/01/01)并将压缩后的 Parquet 文件保存到 Amazon S3 中(例如 s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

主脚本的代码概要如下(processing_script.py)运行 SageMaker 处理作业如下:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

可扩展性是我们管道的一个关键特征。首先,多个 SageMaker 处理作业可用于同时处理几天的数据。其次,我们避免在处理每个指定日期的数据时将整个处理后的数据或原始数据一次性加载到内存中。这使得能够使用主内存中无法容纳一整天数据的实例类型来处理数据。唯一的要求是实例类型应该能够同时将 N 个原始 JSONL 或处理后的 Parquet 文件加载到内存中,其中 N 是正在使用的进程工作线程的数量。

使用 AWS Glue 抓取已处理的数据

处理完多天的所有原始数据后,我们可以使用 AWS Glue 爬网程序从整个数据集创建 Athena 表。我们使用 适用于 pandas 的 AWS 开发工具包 (awswrangler) 使用以下代码片段创建表的库:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

加载已处理的特征进行训练

现在可以使用 SQL 从 Athena 表加载指定日期范围内的已处理特征,然后这些特征可用于训练职位推荐模型。例如,以下代码片段使用以下方法将一个月处理后的特征加载到 DataFrame 中: awswrangler 图书馆:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

此外,可以扩展使用 SQL 加载处理后的特征进行训练,以适应各种其他用例。例如,我们可以应用类似的管道来维护两个单独的 Athena 表:一个用于存储用户印象,另一个用于存储用户对这些印象的点击。使用 SQL 连接语句,我们可以检索用户点击或未点击的印象,然后将这些印象传递给模型训练作业。

解决方案优势

实施建议的解决方案给我们现有的工作流程带来了几个优势,包括:

  • 简化实施 – 该解决方案支持使用流行的 ML 库在 Python 中实现特征提取。而且,它不需要将代码移植到 PySpark 中。这简化了特征提取,因为数据科学家在笔记本中开发的相同代码将由该管道执行。
  • 快速生产路径 – 该解决方案可以由数据科学家开发和部署,以大规模执行特征提取,使他们能够针对这些数据开发 ML 推荐模型。同时,机器学习工程师只需很少的修改即可将相同的解决方案部署到生产中。
  • 雷乌斯能力 – 该解决方案为大规模特征提取提供了可重用的模式,并且可以轻松适应构建推荐模型之外的其他用例。
  • 效率 – 该解决方案提供了良好的性能:处理一天的 人才网的数据花费了不到 1 小时。
  • 增量更新 – 该解决方案还支持增量更新。可以使用 SageMaker 处理作业处理新的每日数据,并且可以重新爬网包含已处理数据的 S3 位置以更新 Athena 表。我们还可以使用 cron 作业每天多次更新今天的数据(例如每 3 小时)。

我们使用此 ETL 管道帮助 Talent.com 每天处理 50,000 个文件,其中包含 5 万条记录,并使用从 Talent.com 的 90 天原始数据(450 个文件中总共 900,000 亿条记录)中提取的特征创建训练数据。我们的管道帮助 Talent.com 在短短两周内构建了推荐系统并将其部署到生产中。该解决方案在 Amazon SageMaker 上执行了包括 ETL 在内的所有 ML 流程,而无需使用其他 AWS 服务。与之前基于 XGBoost 的解决方案相比,职位推荐系统使在线 A/B 测试的点击率提高了 2%,帮助数百万 Talent.com 用户找到了更好的职位。

结论

这篇文章概述了我们为在 Talent.com 上训练和部署职位推荐模型而开发的特征处理 ETL 管道。我们的管道使用 SageMaker 处理作业来进行大规模的高效数据处理和特征提取。特征提取代码以 Python 实现,支持使用流行的 ML 库大规模执行特征提取,无需移植代码即可使用 PySpark。

我们鼓励读者探索使用本博客中提供的管道作为需要大规模特征提取的用例的模板的可能性。数据科学家可以利用该管道来构建机器学习模型,然后机器学习工程师可以采用相同的管道在生产中运行。这可以显着减少端到端 ML 解决方案产品化所需的时间,就像 Talent.com 的情况一样。读者可以参考 设置和运行 SageMaker 处理作业的教程。我们也建议读者查看该帖子 从文本到梦想工作:使用 Amazon SageMaker 在 Talent.com 上构建基于 NLP 的职位推荐系统,我们在这里讨论利用深度学习模型训练技术 亚马逊SageMaker 建立Talent.com的职位推荐系统。


关于作者

德米特里·贝斯帕洛夫德米特里·贝斯帕洛夫 是亚马逊机器学习解决方案实验室的高级应用科学家,他帮助不同行业的 AWS 客户加速人工智能和云的采用。

易翔易翔 是 Amazon 机器学习解决方案实验室的一名应用科学家 II,她帮助不同行业的 AWS 客户加速 AI 和云的采用。

王彤王彤 是亚马逊机器学习解决方案实验室的高级应用科学家,他帮助不同行业的 AWS 客户加速人工智能和云的采用。

阿纳托利·霍缅科阿纳托利·霍缅科 是高级机器学习工程师 人才网 热衷于自然语言处理,为优秀人才找到好工作。

阿布德努尔·贝祖阿布德努尔·贝祖 是一位拥有超过 25 年构建和交付可扩展到数百万客户的技术解决方案经验的高管。 Abdenour 担任首席技术官 (CTO) 人才网 当 AWS 团队设计并执行这个特定的解决方案时 人才网.

严俊奇严俊奇 是亚马逊机器学习解决方案实验室的高级应用科学经理。 她创新并应用机器学习来帮助 AWS 客户加快 AI 和云的采用。

时间戳记:

更多来自 AWS机器学习