Ejecute trabajos de procesamiento seguro con PySpark en Amazon SageMaker Pipelines

Ejecute trabajos de procesamiento seguro con PySpark en Amazon SageMaker Pipelines

Estudio Amazon SageMaker puede ayudarlo a crear, entrenar, depurar, implementar y monitorear sus modelos y administrar sus flujos de trabajo de aprendizaje automático (ML). Canalizaciones de Amazon SageMaker le permite construir un plataforma MLOps segura, escalable y flexible dentro de Estudio.

En esta publicación, explicamos cómo ejecutar trabajos de procesamiento de PySpark dentro de una canalización. Esto permite que cualquiera que quiera entrenar un modelo usando Pipelines también preprocesar datos de entrenamiento, posprocesar datos de inferencia o evaluar modelos usando PySpark. Esta capacidad es especialmente relevante cuando necesita procesar datos a gran escala. Además, mostramos cómo optimizar los pasos de PySpark mediante configuraciones y registros de la interfaz de usuario de Spark.

Los oleoductos es un Amazon SageMaker herramienta para construir y administrar canalizaciones de ML de extremo a extremo. Es un servicio bajo demanda completamente administrado, integrado con SageMaker y otros servicios de AWS y, por lo tanto, crea y administra recursos para usted. Esto garantiza que las instancias solo se aprovisionen y utilicen cuando se ejecutan las canalizaciones. Además, Pipelines cuenta con el apoyo de la SDK de SageMaker Python, lo que le permite realizar un seguimiento de su linaje de datos y reutilizar pasos almacenándolos en caché para facilitar el tiempo y el costo de desarrollo. Una canalización de SageMaker puede usar pasos de procesamiento para procesar datos o realizar la evaluación del modelo.

Al procesar datos a gran escala, los científicos de datos y los ingenieros de ML a menudo usan PySpark, una interfaz para Apache Spark en Python. SageMaker proporciona imágenes de Docker prediseñadas que incluyen PySpark y otras dependencias necesarias para ejecutar trabajos de procesamiento de datos distribuidos, incluidas las transformaciones de datos y la ingeniería de funciones mediante el marco Spark. Si bien esas imágenes le permiten comenzar a usar PySpark rápidamente en el procesamiento de trabajos, el procesamiento de datos a gran escala a menudo requiere configuraciones específicas de Spark para optimizar la computación distribuida del clúster creado por SageMaker.

En nuestro ejemplo, creamos una canalización de SageMaker que ejecuta un solo paso de procesamiento. Para obtener más información sobre qué otros pasos puede agregar a una canalización, consulte Pasos de canalización.

Biblioteca de procesamiento de SageMaker

SageMaker Processing puede ejecutarse con determinados marcos (por ejemplo, SKlearnProcessor, PySparkProcessor o Hugging Face). Independientemente del marco utilizado, cada Paso de procesamiento requiere lo siguiente:

  • Nombre del paso – El nombre que se usará para el paso de canalización de SageMaker
  • Argumentos de paso – Los argumentos a favor de su ProcessingStep

Además, puede proporcionar lo siguiente:

  • La configuración de su caché de pasos para evitar ejecuciones innecesarias de su paso en una canalización de SageMaker
  • Una lista de nombres de pasos, instancias de pasos o instancias de colección de pasos que el ProcessingStep depende
  • El nombre para mostrar del ProcessingStep
  • Una descripción de la ProcessingStep
  • Archivos de propiedad
  • Políticas de reintento

Los argumentos se entregan al ProcessingStep. Puede utilizar el sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcesador class para ejecutar su aplicación Spark dentro de un trabajo de procesamiento.

Cada procesador viene con sus propias necesidades, dependiendo del marco. Esto se ilustra mejor usando el PySparkProcessor, donde puede pasar información adicional para optimizar la ProcessingStep más, por ejemplo a través de la configuration parámetro al ejecutar su trabajo.

Ejecute trabajos de procesamiento de SageMaker en un entorno seguro

Han pasado las mejores prácticas para crear una Amazon VPC privada y configurarla para que no se pueda acceder a sus trabajos a través de la Internet pública. Los trabajos de procesamiento de SageMaker le permiten especificar las subredes privadas y los grupos de seguridad en su VPC, así como habilitar el aislamiento de la red y el cifrado del tráfico entre contenedores mediante el NetworkConfig.VpcConfig parámetro de solicitud del CreateProcessingJob API. Proporcionamos ejemplos de esta configuración usando el SDK de SageMaker en la siguiente sección.

Paso de procesamiento de PySpark dentro de las canalizaciones de SageMaker

Para este ejemplo, asumimos que tiene Studio implementado en un entorno seguro ya disponible, incluidos VPC, puntos finales de VPC, grupos de seguridad, Gestión de identidades y accesos de AWS (IAM) roles, y Servicio de administración de claves de AWS (AWS KMS) claves. También asumimos que tiene dos cubos: uno para artefactos como código y registros, y otro para sus datos. El basic_infra.yaml archivo proporciona ejemplo Formación en la nube de AWS código para aprovisionar la infraestructura de requisitos previos necesaria. El código de ejemplo y la guía de implementación también están disponibles en GitHub.

Como ejemplo, configuramos una canalización que contiene un solo ProcessingStep en el que simplemente estamos leyendo y escribiendo el conjunto de datos de abulón usando Chispa. Los ejemplos de código le muestran cómo instalar y configurar el ProcessingStep.

Definimos parámetros para la canalización (nombre, función, cubos, etc.) y configuraciones específicas de cada paso (tipo y número de instancia, versión del marco, etc.). En este ejemplo, usamos una configuración segura y también definimos subredes, grupos de seguridad y el cifrado de tráfico entre contenedores. Para este ejemplo, necesita un rol de ejecución de canalización con acceso completo a SageMaker y una VPC. Ver el siguiente código:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Para demostrarlo, el siguiente ejemplo de código ejecuta una secuencia de comandos de PySpark en SageMaker Processing dentro de una canalización mediante el PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Como se muestra en el código anterior, estamos sobrescribiendo las configuraciones predeterminadas de Spark proporcionando configuration.json como herramienta de edición del ProcessingInput. Usamos un configuration.json archivo que se guardó en Servicio de almacenamiento simple de Amazon (Amazon S3) con la siguiente configuración:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Podemos actualizar la configuración predeterminada de Spark pasando el archivo como ProcessingInput o utilizando el argumento de configuración al ejecutar el run() función.

La configuración de Spark depende de otras opciones, como el tipo de instancia y el recuento de instancias elegido para el trabajo de procesamiento. La primera consideración es la cantidad de instancias, los núcleos de vCPU que tiene cada una de esas instancias y la memoria de la instancia. Puedes usar IU de Spark or Métricas de instancias de CloudWatch y registros para calibrar estos valores en varias iteraciones de ejecución.

Además, la configuración del ejecutor y del controlador se puede optimizar aún más. Para ver un ejemplo de cómo calcularlos, consulte Prácticas recomendadas para administrar correctamente la memoria para aplicaciones Apache Spark en Amazon EMR.

A continuación, para la configuración del controlador y el ejecutor, recomendamos investigar la configuración del confirmador para mejorar el rendimiento al escribir en Amazon S3. En nuestro caso, estamos escribiendo archivos de Parquet en Amazon S3 y configurando "spark.sql.parquet.fs.optimized.comitter.optimization-enabled”A verdadero.

Si es necesario para una conexión a Amazon S3, un punto final regional "spark.hadoop.fs.s3a.endpoint” se puede especificar dentro del archivo de configuración.

En esta canalización de ejemplo, el script de PySpark spark_process.py (como se muestra en el siguiente código) carga un archivo CSV de Amazon S3 en un marco de datos de Spark y guarda los datos como Parquet en Amazon S3.

Tenga en cuenta que nuestra configuración de ejemplo no es proporcional a la carga de trabajo porque la lectura y escritura del conjunto de datos de abulón se puede realizar con la configuración predeterminada en una instancia. Las configuraciones que mencionamos deben definirse en función de sus necesidades específicas.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Para profundizar en la optimización de los trabajos de procesamiento de Spark, puede utilizar los registros de CloudWatch y la interfaz de usuario de Spark. Puede crear la interfaz de usuario de Spark ejecutando un trabajo de procesamiento en una instancia de notebook de SageMaker. Puedes ver el Spark UI para los trabajos de procesamiento que se ejecutan dentro de una canalización by ejecutando el servidor de historial dentro de una instancia de notebook de SageMaker si los registros de la interfaz de usuario de Spark se guardaron en la misma ubicación de Amazon S3.

Limpiar

Si siguió el tutorial, es una buena práctica eliminar los recursos que ya no se usan para dejar de incurrir en cargos. Asegurate que eliminar la pila de CloudFormation que utilizó para crear sus recursos. Esto eliminará la pila creada, así como los recursos que creó.

Conclusión

En esta publicación, mostramos cómo ejecutar un trabajo de procesamiento de SageMaker seguro usando PySpark dentro de SageMaker Pipelines. También demostramos cómo optimizar PySpark usando configuraciones de Spark y configurar su trabajo de procesamiento para que se ejecute en una configuración de red segura.

Como siguiente paso, explore cómo automatizar todo el ciclo de vida del modelo y cómo los clientes crearon plataformas MLOps seguras y escalables utilizando los servicios de SageMaker.


Acerca de los autores

Ejecute trabajos de procesamiento seguros utilizando PySpark en Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.maren suilmann es científico de datos en Servicios profesionales de AWS. Trabaja con clientes de todas las industrias para revelar el poder de AI/ML para lograr sus resultados comerciales. Maren ha estado con AWS desde noviembre de 2019. En su tiempo libre, disfruta del kickboxing, las caminatas hacia excelentes vistas y las noches de juegos de mesa.


Ejecute trabajos de procesamiento seguros utilizando PySpark en Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.Maira Ladeira Tanke
es un especialista en ML en AWS. Con experiencia en ciencia de datos, tiene 9 años de experiencia en la arquitectura y creación de aplicaciones ML con clientes de todas las industrias. Como líder técnica, ayuda a los clientes a acelerar su logro de valor comercial a través de tecnologías emergentes y soluciones innovadoras. En su tiempo libre, a Maira le gusta viajar y pasar tiempo con su familia en un lugar cálido.


Ejecute trabajos de procesamiento seguros utilizando PySpark en Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.paulina
es científico de datos en el Servicios profesionales de AWS equipo. Ella ayuda a los clientes a lograr y acelerar sus resultados comerciales mediante el desarrollo de soluciones AI/ML. En su tiempo libre, a Pauline le gusta viajar, surfear y probar nuevos postres.


Ejecute trabajos de procesamiento seguros utilizando PySpark en Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Búsqueda vertical. Ai.Donald Fossouo
es Arquitecto de Datos Sr en el Servicios profesionales de AWS equipo, en su mayoría trabajando con Global Finance Service. Se relaciona con los clientes para crear soluciones innovadoras que aborden los problemas comerciales de los clientes y aceleren la adopción de los servicios de AWS. En su tiempo libre, a Donald le gusta leer, correr y viajar.

Sello de tiempo:

Mas de Aprendizaje automático de AWS