Execute trabalhos de processamento seguro usando o PySpark nos pipelines do Amazon SageMaker

Execute trabalhos de processamento seguro usando o PySpark nos pipelines do Amazon SageMaker

Estúdio Amazon SageMaker pode ajudá-lo a criar, treinar, depurar, implantar e monitorar seus modelos e gerenciar seus fluxos de trabalho de aprendizado de máquina (ML). Pipelines Amazon SageMaker permite construir um plataforma MLOps segura, escalável e flexível dentro do Estúdio.

Nesta postagem, explicamos como executar tarefas de processamento do PySpark em um pipeline. Isso permite que qualquer pessoa que queira treinar um modelo usando Pipelines também pré-processe dados de treinamento, pós-processe dados de inferência ou avalie modelos usando PySpark. Esse recurso é especialmente relevante quando você precisa processar dados em grande escala. Além disso, mostramos como otimizar suas etapas do PySpark usando configurações e logs da interface do usuário do Spark.

Pipelines é um Amazon Sage Maker ferramenta para criar e gerenciar pipelines de ML de ponta a ponta. É um serviço sob demanda totalmente gerenciado, integrado ao SageMaker e outros serviços da AWS e, portanto, cria e gerencia recursos para você. Isso garante que as instâncias sejam provisionadas e usadas apenas ao executar os pipelines. Além disso, Pipelines é suportado pelo SDK Python do SageMaker, permitindo que você acompanhe seu linhagem de dados e reutilizar etapas armazenando-os em cache para facilitar o tempo e o custo do desenvolvimento. Um pipeline do SageMaker pode usar etapas de processamento para processar dados ou realizar a avaliação do modelo.

Ao processar dados em grande escala, cientistas de dados e engenheiros de ML costumam usar PySparkGenericName, uma interface para Apache Spark em Python. O SageMaker fornece imagens Docker pré-criadas que incluem PySpark e outras dependências necessárias para executar trabalhos de processamento de dados distribuídos, incluindo transformações de dados e engenharia de recursos usando a estrutura Spark. Embora essas imagens permitam que você comece rapidamente a usar o PySpark em tarefas de processamento, o processamento de dados em grande escala geralmente requer configurações específicas do Spark para otimizar a computação distribuída do cluster criado pelo SageMaker.

Em nosso exemplo, criamos um pipeline do SageMaker executando uma única etapa de processamento. Para obter mais informações sobre quais outras etapas você pode adicionar a um pipeline, consulte Etapas do pipeline.

Biblioteca de processamento do SageMaker

O processamento do SageMaker pode ser executado com enquadramentos (por exemplo, SKlearnProcessor, PySparkProcessor ou Hugging Face). Independente da estrutura utilizada, cada Etapa de Processamento requer o seguinte:

  • Nome da etapa – O nome a ser usado para a etapa do pipeline do SageMaker
  • Argumentos de etapa – Os argumentos para o seu ProcessingStep

Além disso, você pode fornecer o seguinte:

  • A configuração do cache da etapa para evitar execuções desnecessárias da etapa em um pipeline do SageMaker
  • Uma lista de nomes de etapas, instâncias de etapas ou instâncias de coleção de etapas que o ProcessingStep depende
  • O nome de exibição do ProcessingStep
  • Uma descrição do ProcessingStep
  • Arquivos de propriedade
  • Políticas de repetição

Os argumentos são entregues ao ProcessingStep. Você pode usar o sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor class para executar seu aplicativo Spark dentro de um trabalho de processamento.

Cada processador vem com suas próprias necessidades, dependendo da estrutura. Isso é melhor ilustrado usando o PySparkProcessor, onde você pode passar informações adicionais para otimizar o ProcessingStep ainda, por exemplo através do configuration parâmetro ao executar seu trabalho.

Execute trabalhos de processamento do SageMaker em um ambiente seguro

Está melhores práticas para criar um Amazon VPC privado e configurá-lo para que seus trabalhos não sejam acessíveis pela Internet pública. Os trabalhos de processamento do SageMaker permitem que você especifique as sub-redes privadas e os grupos de segurança em seu VPC, bem como habilite o isolamento de rede e a criptografia de tráfego entre contêineres usando o NetworkConfig.VpcConfig parâmetro de solicitação do CreateProcessingJob API. Fornecemos exemplos dessa configuração usando o SDK do SageMaker na próxima seção.

PySpark ProcessingStep nos pipelines do SageMaker

Para este exemplo, presumimos que você tenha o Studio implantado em um ambiente seguro já disponível, incluindo VPC, VPC endpoints, grupos de segurança, Gerenciamento de acesso e identidade da AWS funções (IAM) e Serviço de gerenciamento de chaves AWS (AWS KMS). Também presumimos que você tenha dois depósitos: um para artefatos como código e logs e outro para seus dados. O basic_infra.yaml arquivo fornece exemplo Formação da Nuvem AWS código para fornecer a infraestrutura de pré-requisito necessária. O código de exemplo e o guia de implantação também estão disponíveis em GitHub.

Como exemplo, configuramos um pipeline contendo um único ProcessingStep em que estamos simplesmente lendo e escrevendo o conjunto de dados abalone usando o Spark. Os exemplos de código mostram como instalar e configurar o ProcessingStep.

Definimos parâmetros para o pipeline (nome, função, depósitos e assim por diante) e configurações específicas da etapa (tipo e contagem de instância, versão da estrutura e assim por diante). Neste exemplo, usamos uma configuração segura e também definimos sub-redes, grupos de segurança e a criptografia de tráfego entre contêineres. Para este exemplo, você precisa de uma função de execução de pipeline com acesso total ao SageMaker e uma VPC. Veja o seguinte código:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Para demonstrar, o exemplo de código a seguir executa um script PySpark no SageMaker Processing em um pipeline usando o método PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Conforme mostrado no código anterior, estamos substituindo as configurações padrão do Spark fornecendo configuration.json como um ProcessingInput. Nós usamos um configuration.json arquivo que foi salvo em Serviço de armazenamento simples da Amazon (Amazon S3) com as seguintes configurações:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Podemos atualizar a configuração padrão do Spark passando o arquivo como um ProcessingInput ou usando o argumento de configuração ao executar o run() função.

A configuração do Spark depende de outras opções, como o tipo de instância e a contagem de instâncias escolhidas para o trabalho de processamento. A primeira consideração é o número de instâncias, os núcleos de vCPU que cada uma dessas instâncias possui e a memória da instância. Você pode usar IUs do Spark or Métricas de instância do CloudWatch e logs para calibrar esses valores em várias iterações de execução.

Além disso, as configurações do executor e do driver podem ser ainda mais otimizadas. Para obter um exemplo de como calculá-los, consulte Práticas recomendadas para gerenciar com êxito a memória de aplicativos Apache Spark no Amazon EMR.

Em seguida, para configurações de driver e executor, recomendamos investigar as configurações do committer para melhorar o desempenho ao gravar no Amazon S3. Em nosso caso, estamos gravando arquivos Parquet no Amazon S3 e definindo “spark.sql.parquet.fs.optimized.comitter.optimization-enabled”Para verdade.

Se necessário para uma conexão com o Amazon S3, um endpoint regional “spark.hadoop.fs.s3a.endpoint” pode ser especificado no arquivo de configurações.

Neste pipeline de exemplo, o script PySpark spark_process.py (conforme mostrado no código a seguir) carrega um arquivo CSV do Amazon S3 em um quadro de dados Spark e salva os dados como Parquet de volta no Amazon S3.

Observe que nossa configuração de exemplo não é proporcional à carga de trabalho porque a leitura e a gravação do conjunto de dados abalone podem ser feitas nas configurações padrão em uma instância. As configurações que mencionamos devem ser definidas com base em suas necessidades específicas.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Para se aprofundar na otimização dos trabalhos de processamento do Spark, você pode usar os logs do CloudWatch, bem como a interface do usuário do Spark. Você pode criar a interface do usuário do Spark executando um trabalho de processamento em uma instância de bloco de anotações do SageMaker. Você pode ver o Spark UI para os trabalhos de processamento em execução em um pipeline by executando o servidor de histórico em uma instância de bloco de anotações do SageMaker se os logs da interface do usuário do Spark foram salvos no mesmo local do Amazon S3.

limpar

Se você seguiu o tutorial, é uma boa prática excluir os recursos que não são mais usados ​​para evitar cobranças. Tenha certeza de excluir a pilha do CloudFormation que você usou para criar seus recursos. Isso excluirá a pilha criada, bem como os recursos que ela criou.

Conclusão

Nesta postagem, mostramos como executar um trabalho de processamento seguro do SageMaker usando o PySpark nos pipelines do SageMaker. Também demonstramos como otimizar o PySpark usando configurações do Spark e configurar seu trabalho de processamento para ser executado em uma configuração de rede segura.

Como próximo passo, explore como automatizar todo o ciclo de vida do modelo e como os clientes construíram plataformas MLOps seguras e escaláveis usando os serviços SageMaker.


Sobre os autores

Execute trabalhos de processamento seguro usando PySpark no Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Maren Suilmann é Cientista de Dados na Serviços Profissionais AWS. Ela trabalha com clientes de vários setores, revelando o poder da IA/ML para alcançar seus resultados de negócios. Maren está na AWS desde novembro de 2019. Em seu tempo livre, ela gosta de praticar kickboxing, fazer caminhadas com vistas incríveis e noites de jogos de tabuleiro.


Execute trabalhos de processamento seguro usando PySpark no Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Tanque Maira Ladeira
é especialista em ML da AWS. Com formação em ciência de dados, ela tem 9 anos de experiência na arquitetura e construção de aplicativos de ML com clientes de vários setores. Como líder técnica, ela ajuda os clientes a acelerar a obtenção de valor comercial por meio de tecnologias emergentes e soluções inovadoras. Em seu tempo livre, Maira gosta de viajar e passar o tempo com sua família em um lugar aconchegante.


Execute trabalhos de processamento seguro usando PySpark no Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Pauline Ting
é Cientista de Dados na Serviços Profissionais AWS equipe. Ela ajuda os clientes a alcançar e acelerar seus resultados de negócios desenvolvendo soluções de IA/ML. Em seu tempo livre, Pauline gosta de viajar, surfar e experimentar novas sobremesas.


Execute trabalhos de processamento seguro usando PySpark no Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pesquisa vertical. Ai.Donald Fossouo
é Arquiteto de Dados Sr no Serviços Profissionais AWS equipe, trabalhando principalmente com o Global Finance Service. Ele interage com os clientes para criar soluções inovadoras que abordam os problemas de negócios do cliente e aceleram a adoção dos serviços da AWS. Em seu tempo livre, Donald gosta de ler, correr e viajar.

Carimbo de hora:

Mais de Aprendizado de máquina da AWS