Optimización del procesamiento de datos ETL en Talent.com con Amazon SageMaker | Servicios web de Amazon

Optimización del procesamiento de datos ETL en Talent.com con Amazon SageMaker | Servicios web de Amazon

Los coautores de esta publicación son Anatoly Khomenko, ingeniero de aprendizaje automático, y Abdenour Bezzouh, director de tecnología de Talent.com.

Establecido en 2011, talento.com agrega ofertas de trabajo remuneradas de sus clientes y ofertas de trabajo públicas, y ha creado una plataforma unificada y de fácil búsqueda. Talent.com, que cubre más de 30 millones de ofertas de trabajo en más de 75 países y abarca varios idiomas, industrias y canales de distribución, satisface las diversas necesidades de los solicitantes de empleo, conectando de manera efectiva a millones de solicitantes de empleo con oportunidades laborales.

La misión de Talent.com es facilitar las conexiones globales de la fuerza laboral. Para lograr esto, Talent.com agrega ofertas de trabajo de varias fuentes en la web, ofreciendo a quienes buscan empleo acceso a un amplio conjunto de más de 30 millones de oportunidades laborales adaptadas a sus habilidades y experiencias. En línea con esta misión, Talent.com colaboró ​​con AWS para desarrollar un motor de recomendación laboral de vanguardia impulsado por el aprendizaje profundo, destinado a ayudar a los usuarios a avanzar en sus carreras.

Para garantizar el funcionamiento eficaz de este motor de recomendación de empleos, es fundamental implementar un proceso de procesamiento de datos a gran escala responsable de extraer y perfeccionar las características de las listas de empleos agregadas de Talent.com. Este canal es capaz de procesar 5 millones de registros diarios en menos de 1 hora y permite procesar varios días de registros en paralelo. Además, esta solución permite una rápida implementación en producción. La fuente principal de datos para esta canalización es el formato JSON Lines, almacenado en Servicio de almacenamiento simple de Amazon (Amazon S3) y particionado por fecha. Cada día, esto da como resultado la generación de decenas de miles de archivos de líneas JSON, con actualizaciones incrementales que se realizan diariamente.

El objetivo principal de este proceso de procesamiento de datos es facilitar la creación de funciones necesarias para capacitar e implementar el motor de recomendación de empleo en Talent.com. Vale la pena señalar que este proceso debe admitir actualizaciones incrementales y satisfacer los complejos requisitos de extracción de funciones necesarios para los módulos de capacitación e implementación esenciales para el sistema de recomendación de trabajos. Nuestro canal pertenece a la familia de procesos ETL (extracción, transformación y carga) general que combina datos de múltiples fuentes en un repositorio central grande.

Para obtener más información sobre cómo Talent.com y AWS crearon en colaboración técnicas de capacitación de modelos de aprendizaje profundo y procesamiento del lenguaje natural de vanguardia, utilizando Amazon SageMaker Para elaborar un sistema de recomendación de empleo, consulte Del texto al trabajo soñado: creación de un recomendador de empleo basado en PNL en Talent.com con Amazon SageMaker. El sistema incluye ingeniería de funciones, diseño de arquitectura de modelo de aprendizaje profundo, optimización de hiperparámetros y evaluación de modelo, donde todos los módulos se ejecutan utilizando Python.

Esta publicación muestra cómo utilizamos SageMaker para crear un proceso de procesamiento de datos a gran escala para preparar funciones para el motor de recomendación de empleos en Talent.com. La solución resultante permite a un científico de datos idear la extracción de funciones en un cuaderno de SageMaker utilizando bibliotecas de Python, como Scikit-Aprender or PyTorchy luego implementar rápidamente el mismo código en el proceso de procesamiento de datos realizando la extracción de características a escala. La solución no requiere portar el código de extracción de funciones para usar PySpark, como se requiere cuando se usa Pegamento AWS como solución ETL. Nuestra solución puede ser desarrollada e implementada únicamente por un científico de datos de extremo a extremo utilizando solo un SageMaker y no requiere conocimiento de otras soluciones ETL, como Lote de AWS. Esto puede acortar significativamente el tiempo necesario para implementar el proceso de aprendizaje automático (ML) en producción. La canalización se opera a través de Python y se integra perfectamente con los flujos de trabajo de extracción de características, lo que la hace adaptable a una amplia gama de aplicaciones de análisis de datos.

Resumen de la solución

Descripción general de la canalización ETL mediante el procesamiento de SageMaker

El oleoducto se compone de tres fases principales:

  1. utilizar un Procesamiento de Amazon SageMaker trabajo para manejar archivos JSONL sin procesar asociados con un día específico. Se pueden procesar varios días de datos mediante trabajos de procesamiento separados simultáneamente.
  2. Emplear Pegamento AWS para el rastreo de datos después de procesar varios días de datos.
  3. Cargue funciones procesadas para un rango de fechas específico utilizando SQL desde un Atenea amazónica tabla, luego entrene e implemente el modelo de recomendación de trabajos.

Procesar archivos JSONL sin formato

Procesamos archivos JSONL sin formato para un día específico utilizando un trabajo de procesamiento de SageMaker. El trabajo implementa la extracción de características y la compactación de datos, y guarda las características procesadas en archivos Parquet con 1 millón de registros por archivo. Aprovechamos la paralelización de la CPU para realizar la extracción de funciones para cada archivo JSONL sin formato en paralelo. Los resultados del procesamiento de cada archivo JSONL se guardan en un archivo Parquet separado dentro de un directorio temporal. Una vez procesados ​​todos los archivos JSONL, realizamos la compactación de miles de pequeños archivos Parquet en varios archivos con 1 millón de registros por archivo. Luego, los archivos Parquet compactados se cargan en Amazon S3 como resultado del trabajo de procesamiento. La compactación de datos garantiza un rastreo y consultas SQL eficientes en las siguientes etapas del proceso.

El siguiente es el código de muestra para programar un trabajo de procesamiento de SageMaker para un día específico, por ejemplo, 2020-01-01, utilizando el SDK de SageMaker. El trabajo lee archivos JSONL sin formato de Amazon S3 (por ejemplo, de s3://bucket/raw-data/2020/01/01) y guarda los archivos Parquet compactados en Amazon S3 (por ejemplo, en s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

El siguiente esquema de código para el script principal (processing_script.py) que ejecuta el trabajo de procesamiento de SageMaker es el siguiente:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

La escalabilidad es una característica clave de nuestra cartera. En primer lugar, se pueden utilizar varios trabajos de procesamiento de SageMaker para procesar datos durante varios días simultáneamente. En segundo lugar, evitamos cargar todos los datos procesados ​​o sin procesar en la memoria a la vez, mientras procesamos cada día de datos específico. Esto permite el procesamiento de datos utilizando tipos de instancias que no pueden acomodar los datos de un día completo en la memoria primaria. El único requisito es que el tipo de instancia sea capaz de cargar N archivos JSONL sin procesar o archivos Parquet procesados ​​en la memoria simultáneamente, siendo N el número de trabajadores de proceso en uso.

Rastrear datos procesados ​​con AWS Glue

Una vez procesados ​​todos los datos sin procesar de varios días, podemos crear una tabla de Athena a partir de todo el conjunto de datos utilizando un rastreador de AWS Glue. Usamos el SDK de AWS para pandas (awswrangler) biblioteca para crear la tabla usando el siguiente fragmento:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Cargar funciones procesadas para entrenamiento

Las funciones procesadas para un rango de fechas específico ahora se pueden cargar desde la tabla de Athena usando SQL, y estas funciones luego se pueden usar para entrenar el modelo de recomendación de trabajos. Por ejemplo, el siguiente fragmento carga un mes de funciones procesadas en un DataFrame usando el awswrangler biblioteca:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

Además, el uso de SQL para cargar funciones procesadas para capacitación se puede ampliar para dar cabida a otros casos de uso. Por ejemplo, podemos aplicar una canalización similar para mantener dos tablas de Athena separadas: una para almacenar las impresiones de los usuarios y otra para almacenar los clics de los usuarios en estas impresiones. Al utilizar declaraciones de unión SQL, podemos recuperar impresiones en las que los usuarios hicieron clic o en las que no hicieron clic y luego pasar estas impresiones a un trabajo de capacitación modelo.

Beneficios de la solución

La implementación de la solución propuesta aporta varias ventajas a nuestro flujo de trabajo existente, que incluyen:

  • Implementación simplificada – La solución permite implementar la extracción de funciones en Python utilizando bibliotecas de aprendizaje automático populares. Y no requiere que el código se transfiera a PySpark. Esto agiliza la extracción de funciones, ya que esta canalización ejecutará el mismo código desarrollado por un científico de datos en un cuaderno.
  • Camino rápido a la producción – La solución puede ser desarrollada e implementada por un científico de datos para realizar la extracción de características a escala, lo que les permite desarrollar un modelo de recomendación de ML con estos datos. Al mismo tiempo, un ingeniero de aprendizaje automático puede implementar la misma solución en producción con pocas modificaciones necesarias.
  • Reutilización – La solución proporciona un patrón reutilizable para la extracción de características a escala y se puede adaptar fácilmente para otros casos de uso más allá de la creación de modelos de recomendación.
  • Eficiencia – La solución ofrece un buen rendimiento: procesar un solo día del talento.comLos datos tardaron menos de 1 hora.
  • Actualizaciones incrementales – La solución también admite actualizaciones incrementales. Se pueden procesar nuevos datos diarios con un trabajo de procesamiento de SageMaker y se puede volver a rastrear la ubicación de S3 que contiene los datos procesados ​​para actualizar la tabla de Athena. También podemos usar un trabajo cron para actualizar los datos de hoy varias veces al día (por ejemplo, cada 3 horas).

Usamos este canal ETL para ayudar a Talent.com a procesar 50,000 5 archivos por día que contienen 90 millones de registros y creamos datos de capacitación utilizando funciones extraídas de 450 días de datos sin procesar de Talent.com: un total de 900,000 millones de registros en 2 8.6 archivos. Nuestro proceso ayudó a Talent.com a construir e implementar el sistema de recomendación en producción en solo XNUMX semanas. La solución realizó todos los procesos de aprendizaje automático, incluido ETL, en Amazon SageMaker sin utilizar otro servicio de AWS. El sistema de recomendación de empleo impulsó un aumento del XNUMX % en la tasa de clics en las pruebas A/B en línea en comparación con una solución anterior basada en XGBoost, lo que ayudó a conectar a millones de usuarios de Talent.com con mejores trabajos.

Conclusión

Esta publicación describe el proceso de ETL que desarrollamos para el procesamiento de funciones para la capacitación y la implementación de un modelo de recomendación de empleo en Talent.com. Nuestro canal utiliza trabajos de procesamiento de SageMaker para el procesamiento eficiente de datos y la extracción de funciones a gran escala. El código de extracción de funciones se implementa en Python, lo que permite el uso de bibliotecas de aprendizaje automático populares para realizar la extracción de funciones a escala, sin la necesidad de migrar el código para usar PySpark.

Alentamos a los lectores a explorar la posibilidad de utilizar el canal presentado en este blog como plantilla para sus casos de uso donde se requiere extracción de características a escala. Un científico de datos puede aprovechar la canalización para crear un modelo de aprendizaje automático, y luego un ingeniero de aprendizaje automático puede adoptar la misma canalización para ejecutarla en producción. Esto puede reducir significativamente el tiempo necesario para producir la solución de aprendizaje automático de un extremo a otro, como fue el caso de Talent.com. Los lectores pueden consultar el tutorial para configurar y ejecutar trabajos de procesamiento de SageMaker. También recomendamos a los lectores que vean la publicación. Del texto al trabajo soñado: creación de un recomendador de empleo basado en PNL en Talent.com con Amazon SageMaker, donde analizamos técnicas de entrenamiento de modelos de aprendizaje profundo que utilizan Amazon SageMaker para construir el sistema de recomendación de empleo de Talent.com.


Sobre los autores

Dmitri BespalovDmitri Bespalov es científico aplicado sénior en el laboratorio de soluciones de aprendizaje automático de Amazon, donde ayuda a los clientes de AWS de diferentes industrias a acelerar su adopción de la inteligencia artificial y la nube.

Yi XiangYi Xiang es científica aplicada II en el laboratorio de soluciones de aprendizaje automático de Amazon, donde ayuda a los clientes de AWS de diferentes industrias a acelerar su adopción de la IA y la nube.

Ton WangTon Wang es científico aplicado sénior en el laboratorio de soluciones de aprendizaje automático de Amazon, donde ayuda a los clientes de AWS de diferentes industrias a acelerar su adopción de la inteligencia artificial y la nube.

Anatoli JomenkoAnatoli Jomenko es ingeniero sénior de aprendizaje automático en talento.com con una pasión por el procesamiento del lenguaje natural que une buenas personas con buenos trabajos.

Abdenour BezzouhAbdenour Bezzouh es un ejecutivo con más de 25 años de experiencia en la creación y entrega de soluciones tecnológicas que escalan a millones de clientes. Abdenour ocupó el cargo de Director de Tecnología (CTO) en talento.com cuando el equipo de AWS diseñó y ejecutó esta solución particular para talento.com.

yanjun qiyanjun qi es gerente sénior de ciencias aplicadas en Amazon Machine Learning Solution Lab. Ella innova y aplica el aprendizaje automático para ayudar a los clientes de AWS a acelerar su adopción de la inteligencia artificial y la nube.

Sello de tiempo:

Mas de Aprendizaje automático de AWS