Simplificando o processamento de dados ETL na Talent.com com Amazon SageMaker | Amazon Web Services

Simplificando o processamento de dados ETL na Talent.com com Amazon SageMaker | Amazon Web Services

Esta postagem é de coautoria de Anatoly Khomenko, engenheiro de aprendizado de máquina, e Abdenour Bezzouh, diretor de tecnologia da Talent.com.

Fundada em 2011, talento.com agrega listas de empregos pagos de seus clientes e listas de empregos públicos e criou uma plataforma unificada e facilmente pesquisável. Cobrindo mais de 30 milhões de listas de empregos em mais de 75 países e abrangendo vários idiomas, indústrias e canais de distribuição, Talent.com atende às diversas necessidades dos candidatos a emprego, conectando efetivamente milhões de candidatos a oportunidades de emprego.

A missão da Talent.com é facilitar as conexões globais da força de trabalho. Para conseguir isso, Talent.com agrega listas de empregos de várias fontes na web, oferecendo aos candidatos acesso a um extenso conjunto de mais de 30 milhões de oportunidades de emprego adaptadas às suas habilidades e experiências. Em linha com esta missão, a Talent.com colaborou com a AWS para desenvolver um motor de recomendação de emprego de última geração, impulsionado pela aprendizagem profunda, destinado a ajudar os utilizadores a progredir nas suas carreiras.

Para garantir o funcionamento eficaz deste mecanismo de recomendação de empregos, é crucial implementar um pipeline de processamento de dados em grande escala responsável por extrair e refinar recursos das listas de empregos agregadas da Talent.com. Este pipeline é capaz de processar 5 milhões de registros diários em menos de 1 hora e permite processar vários dias de registros em paralelo. Além disso, esta solução permite uma rápida implantação em produção. A principal fonte de dados para este pipeline é o formato JSON Lines, armazenado em Serviço de armazenamento simples da Amazon (Amazon S3) e particionado por data. A cada dia, isso resulta na geração de dezenas de milhares de arquivos JSON Lines, com atualizações incrementais ocorrendo diariamente.

O objetivo principal deste pipeline de processamento de dados é facilitar a criação de recursos necessários para treinamento e implantação do mecanismo de recomendação de empregos no Talent.com. É importante notar que esse pipeline deve suportar atualizações incrementais e atender aos intrincados requisitos de extração de recursos necessários para os módulos de treinamento e implantação essenciais para o sistema de recomendação de empregos. Nosso pipeline pertence à família geral de processos ETL (extrair, transformar e carregar) que combina dados de várias fontes em um grande repositório central.

Para obter mais informações sobre como a Talent.com e a AWS construíram de forma colaborativa processamento de linguagem natural de ponta e técnicas de treinamento de modelo de aprendizagem profunda, utilizando Amazon Sage Maker para criar um sistema de recomendação de trabalho, consulte Do texto ao emprego dos sonhos: criando um recomendador de empregos baseado em PNL em Talent.com com Amazon SageMaker. O sistema inclui engenharia de recursos, design de arquitetura de modelo de aprendizado profundo, otimização de hiperparâmetros e avaliação de modelo, onde todos os módulos são executados usando Python.

Esta postagem mostra como usamos o SageMaker para construir um pipeline de processamento de dados em grande escala para preparar recursos para o mecanismo de recomendação de empregos em Talent.com. A solução resultante permite que um cientista de dados idealize a extração de recursos em um notebook SageMaker usando bibliotecas Python, como scikit-learn or PyTorche, em seguida, implantar rapidamente o mesmo código no pipeline de processamento de dados, realizando a extração de recursos em escala. A solução não requer a portabilidade do código de extração de recursos para usar o PySpark, conforme necessário ao usar Cola AWS como a solução ETL. Nossa solução pode ser desenvolvida e implantada exclusivamente por um Cientista de Dados de ponta a ponta usando apenas um SageMaker e não requer conhecimento de outras soluções ETL, como Lote da AWS. Isso pode reduzir significativamente o tempo necessário para implantar o pipeline de Machine Learning (ML) na produção. O pipeline é operado por meio de Python e integra-se perfeitamente aos fluxos de trabalho de extração de recursos, tornando-o adaptável a uma ampla variedade de aplicativos de análise de dados.

Visão geral da solução

Visão geral do pipeline ETL usando SageMaker Processing

O pipeline é composto por três fases principais:

  1. Utilize um Processamento do Amazon SageMaker job para lidar com arquivos JSONL brutos associados a um dia especificado. Vários dias de dados podem ser processados ​​simultaneamente por trabalhos de processamento separados.
  2. Empregar Cola AWS para rastreamento de dados após o processamento de vários dias de dados.
  3. Carregue recursos processados ​​para um intervalo de datas especificado usando SQL de um Amazona atena tabela e, em seguida, treinar e implantar o modelo de recomendação de trabalho.

Processar arquivos JSONL brutos

Processamos arquivos JSONL brutos para um dia específico usando um trabalho do SageMaker Processing. O trabalho implementa extração de recursos e compactação de dados e salva recursos processados ​​em arquivos Parquet com 1 milhão de registros por arquivo. Aproveitamos a paralelização da CPU para realizar a extração de recursos para cada arquivo JSONL bruto em paralelo. Os resultados do processamento de cada arquivo JSONL são salvos em um arquivo Parquet separado dentro de um diretório temporário. Após o processamento de todos os arquivos JSONL, realizamos a compactação de milhares de pequenos arquivos Parquet em vários arquivos com 1 milhão de registros por arquivo. Os arquivos Parquet compactados são então carregados no Amazon S3 como saída do trabalho de processamento. A compactação de dados garante rastreamento eficiente e consultas SQL nas próximas etapas do pipeline.

A seguir está o código de exemplo para agendar um trabalho de processamento do SageMaker para um dia especificado, por exemplo 2020-01-01, usando o SDK do SageMaker. O trabalho lê arquivos JSONL brutos do Amazon S3 (por exemplo, do s3://bucket/raw-data/2020/01/01) e salva os arquivos Parquet compactados no Amazon S3 (por exemplo, para s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies %pip install sagemaker pyarrow s3fs awswrangler import sagemaker
import boto3 from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingInput, ProcessingOutput region = boto3.session.Session().region_name
role = get_execution_role()
bucket = sagemaker.Session().default_bucket() ### we use instance with 16 CPUs and 128 GiB memory
### note that the script will NOT load the entire data into memory during compaction
### depending on the size of individual jsonl files, larger instance may be needed
instance = "ml.r5.4xlarge"
n_jobs = 8 ### we use 8 process workers
date = "2020-01-01" ### process data for one day est_cls = SKLearn
framework_version_str = "0.20.0" ### schedule processing job
script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,
) script_processor.run( code="processing_script.py", ### name of the main processing script source_dir="../src/etl/", ### location of source code directory ### our processing script loads raw jsonl files directly from S3 ### this avoids long start-up times of the processing jobs, ### since raw data does not need to be copied into instance inputs=[], ### processing job input is empty outputs=[ ProcessingOutput(destination="s3://bucket/processed/table-name/", source="/opt/ml/processing/output"), ], arguments=[ ### directory with job's output "--output", "/opt/ml/processing/output", ### temporary directory inside instance "--tmp_output", "/opt/ml/tmp_output", "--n_jobs", str(n_jobs), ### number of process workers "--date", date, ### date to process ### location with raw jsonl files in S3 "--path", "s3://bucket/raw-data/", ], wait=False
)

O seguinte esboço de código para o script principal (processing_script.py) que executa o trabalho de processamento do SageMaker é o seguinte:

import concurrent
import pyarrow.dataset as ds
import os
import s3fs
from pathlib import Path ### function to process raw jsonl file and save extracted features into parquet file from process_data import process_jsonl ### parse command line arguments
args = parse_args() ### we use s3fs to crawl S3 input path for raw jsonl files
fs = s3fs.S3FileSystem()
### we assume raw jsonl files are stored in S3 directories partitioned by date
### for example: s3://bucket/raw-data/2020/01/01/
jsons = fs.find(os.path.join(args.path, *args.date.split('-'))) ### temporary directory location inside the Processing job instance
tmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}") ### directory location with job's output
out_dir = os.path.join(args.output, f"day_partition={args.date}") ### process individual jsonl files in parallel using n_jobs process workers
futures=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file = Path(file) out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet") ### process_jsonl function reads raw jsonl file from S3 location (inp_file) ### and saves result into parquet file (out_file) inside temporary directory futures.append(executor.submit(process_jsonl, file, out_file)) ### wait until all jsonl files are processed for future in concurrent.futures.as_completed(futures): result = future.result() ### compact parquet files
dataset = ds.dataset(tmp_out) if len(dataset.schema) > 0: ### save compacted parquet files with 1MM records per file ds.write_dataset(dataset, out_dir, format="parquet", max_rows_per_file=1024 * 1024)

A escalabilidade é uma característica fundamental do nosso pipeline. Primeiro, vários trabalhos do SageMaker Processing podem ser usados ​​para processar dados simultaneamente durante vários dias. Em segundo lugar, evitamos carregar todos os dados processados ​​ou brutos na memória de uma só vez, enquanto processamos cada dia especificado de dados. Isso permite o processamento de dados usando tipos de instância que não podem acomodar dados de um dia inteiro na memória primária. O único requisito é que o tipo de instância seja capaz de carregar N arquivos JSONL brutos ou arquivos Parquet processados ​​na memória simultaneamente, sendo N o número de trabalhadores do processo em uso.

Rastrear dados processados ​​usando AWS Glue

Depois que todos os dados brutos de vários dias forem processados, podemos criar uma tabela Athena a partir de todo o conjunto de dados usando um rastreador AWS Glue. Nós usamos o AWS SDK para pandas (awswrangler) biblioteca para criar a tabela usando o seguinte trecho:

import awswrangler as wr ### crawl processed data in S3
res = wr.s3.store_parquet_metadata( path='s3://bucket/processed/table-name/', database="database_name", table="table_name", dataset=True, mode="overwrite", sampling=1.0, path_suffix='.parquet',
) ### print table schema
print(res[0])

Carregar recursos processados ​​para treinamento

Os recursos processados ​​para um intervalo de datas especificado agora podem ser carregados da tabela Athena usando SQL e esses recursos podem ser usados ​​para treinar o modelo de recomendação de tarefas. Por exemplo, o trecho a seguir carrega um mês de recursos processados ​​em um DataFrame usando o awswrangler biblioteca:

import awswrangler as wr query = """ SELECT * FROM table_name WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """ ### load 1 month of data from database_name.table_name into a DataFrame
df = wr.athena.read_sql_query(query, database='database_name')

Além disso, o uso de SQL para carregar recursos processados ​​para treinamento pode ser estendido para acomodar vários outros casos de uso. Por exemplo, podemos aplicar um pipeline semelhante para manter duas tabelas Athena separadas: uma para armazenar impressões de usuários e outra para armazenar cliques de usuários nessas impressões. Usando instruções de junção SQL, podemos recuperar impressões nas quais os usuários clicaram ou não e, em seguida, passar essas impressões para um trabalho de treinamento de modelo.

Benefícios da solução

A implementação da solução proposta traz diversas vantagens ao nosso fluxo de trabalho existente, incluindo:

  • Implementação simplificada – A solução permite que a extração de recursos seja implementada em Python usando bibliotecas populares de ML. E não exige que o código seja portado para o PySpark. Isso agiliza a extração de recursos, pois o mesmo código desenvolvido por um Cientista de Dados em um notebook será executado por este pipeline.
  • Caminho rápido para a produção – A solução pode ser desenvolvida e implantada por um cientista de dados para realizar a extração de recursos em escala, permitindo-lhes desenvolver um modelo de recomendação de ML com base nesses dados. Ao mesmo tempo, a mesma solução pode ser implantada em produção por um engenheiro de ML com poucas modificações necessárias.
  • Reutilização – A solução fornece um padrão reutilizável para extração de recursos em escala e pode ser facilmente adaptada para outros casos de uso além da construção de modelos de recomendação.
  • Eficiência – A solução oferece bom desempenho: processando um único dia do talento.comos dados demoraram menos de 1 hora.
  • Atualizações incrementais – A solução também oferece suporte a atualizações incrementais. Novos dados diários podem ser processados ​​com um trabalho de processamento do SageMaker, e o local do S3 que contém os dados processados ​​pode ser rastreado novamente para atualizar a tabela do Athena. Também podemos usar um cron job para atualizar os dados de hoje várias vezes ao dia (por exemplo, a cada 3 horas).

Usamos esse pipeline de ETL para ajudar a Talent.com a processar 50,000 arquivos por dia contendo 5 milhões de registros e criamos dados de treinamento usando recursos extraídos de 90 dias de dados brutos da Talent.com – um total de 450 milhões de registros em 900,000 arquivos. Nosso pipeline ajudou a Talent.com a construir e implantar o sistema de recomendação em produção em apenas 2 semanas. A solução executou todos os processos de ML, incluindo ETL no Amazon SageMaker, sem utilizar outro serviço AWS. O sistema de recomendação de empregos gerou um aumento de 8.6% na taxa de cliques em testes A/B on-line em comparação com uma solução anterior baseada em XGBoost, ajudando a conectar milhões de usuários do Talent.com a empregos melhores.

Conclusão

Esta postagem descreve o pipeline de ETL que desenvolvemos para processamento de recursos para treinamento e implantação de um modelo de recomendação de empregos em Talent.com. Nosso pipeline usa trabalhos do SageMaker Processing para processamento eficiente de dados e extração de recursos em grande escala. O código de extração de recursos é implementado em Python, permitindo o uso de bibliotecas populares de ML para realizar a extração de recursos em escala, sem a necessidade de portar o código para usar o PySpark.

Encorajamos os leitores a explorar a possibilidade de usar o pipeline apresentado neste blog como um modelo para seus casos de uso onde a extração de recursos em escala é necessária. O pipeline pode ser aproveitado por um cientista de dados para construir um modelo de ML, e o mesmo pipeline pode então ser adotado por um engenheiro de ML para execução em produção. Isso pode reduzir significativamente o tempo necessário para produzir a solução de ML de ponta a ponta, como foi o caso da Talent.com. Os leitores podem consultar o tutorial para configurar e executar trabalhos de processamento do SageMaker. Também recomendamos aos leitores que vejam a postagem Do texto ao emprego dos sonhos: criando um recomendador de empregos baseado em PNL em Talent.com com Amazon SageMaker, onde discutimos técnicas de treinamento de modelo de aprendizagem profunda utilizando Amazon Sage Maker para construir o sistema de recomendação de empregos da Talent.com.


Sobre os autores

Dmitry BespalovDmitry Bespalov é cientista sênior aplicado no laboratório de soluções de aprendizado de máquina da Amazon, onde ajuda clientes da AWS em diferentes setores a acelerar a adoção de IA e nuvem.

Yi XiangYi Xiang é Cientista Aplicada II no Amazon Machine Learning Solutions Lab, onde ajuda clientes da AWS em diferentes setores a acelerar sua adoção de IA e nuvem.

Tong WangTong Wang é cientista sênior aplicado no laboratório de soluções de aprendizado de máquina da Amazon, onde ajuda clientes da AWS em diferentes setores a acelerar a adoção de IA e nuvem.

Anatoly KhomenkoAnatoly Khomenko é engenheiro sênior de aprendizado de máquina na talento.com com uma paixão pelo processamento de linguagem natural, combinando boas pessoas com bons empregos.

Abdenour BezzouhAbdenour Bezzouh é um executivo com mais de 25 anos de experiência construindo e entregando soluções tecnológicas que atendem milhões de clientes. Abdenour ocupou o cargo de Diretor de Tecnologia (CTO) na talento.com quando a equipe da AWS projetou e executou esta solução específica para talento.com.

Yanjun QiYanjun Qi é gerente sênior de ciência aplicada no laboratório de soluções de aprendizado de máquina da Amazon. Ela inova e aplica aprendizado de máquina para ajudar os clientes da AWS a acelerar a adoção de IA e nuvem.

Carimbo de hora:

Mais de Aprendizado de máquina da AWS