Exécutez des tâches de traitement sécurisées à l'aide de PySpark dans Amazon SageMaker Pipelines

Exécutez des tâches de traitement sécurisées à l'aide de PySpark dans Amazon SageMaker Pipelines

Amazon SageMakerStudio peut vous aider à créer, former, déboguer, déployer et surveiller vos modèles et à gérer vos flux de travail d'apprentissage automatique (ML). Pipelines Amazon SageMaker vous permet de construire un plate-forme MLOps sécurisée, évolutive et flexible au sein de Studio.

Dans cet article, nous expliquons comment exécuter des tâches de traitement PySpark dans un pipeline. Cela permet à quiconque souhaite former un modèle à l'aide de Pipelines de prétraiter également les données de formation, de post-traiter les données d'inférence ou d'évaluer des modèles à l'aide de PySpark. Cette capacité est particulièrement pertinente lorsque vous devez traiter des données à grande échelle. De plus, nous montrons comment optimiser vos étapes PySpark à l'aide de configurations et de journaux d'interface utilisateur Spark.

Les pipelines sont un Amazon Sage Maker outil pour créer et gérer des pipelines ML de bout en bout. Il s'agit d'un service à la demande entièrement géré, intégré à SageMaker et à d'autres services AWS, qui crée et gère donc les ressources pour vous. Cela garantit que les instances ne sont provisionnées et utilisées que lors de l'exécution des pipelines. De plus, Pipelines est soutenu par le Kit de développement logiciel (SDK) SageMaker Python, vous permettant de suivre votre lignage de données et les réutiliser les étapes en les mettant en cache pour réduire le temps et les coûts de développement. Un pipeline SageMaker peut utiliser étapes de traitement pour traiter des données ou effectuer une évaluation de modèle.

Lors du traitement de données à grande échelle, les data scientists et les ingénieurs ML utilisent souvent PySparkName, une interface pour Apache Spark en Python. SageMaker fournit des images Docker prédéfinies qui incluent PySpark et d'autres dépendances nécessaires pour exécuter des tâches de traitement de données distribuées, y compris des transformations de données et l'ingénierie de fonctionnalités à l'aide du framework Spark. Bien que ces images vous permettent de commencer rapidement à utiliser PySpark dans le traitement des tâches, le traitement de données à grande échelle nécessite souvent des configurations Spark spécifiques afin d'optimiser le calcul distribué du cluster créé par SageMaker.

Dans notre exemple, nous créons un pipeline SageMaker exécutant une seule étape de traitement. Pour plus d'informations sur les autres étapes que vous pouvez ajouter à un pipeline, reportez-vous à Étapes du pipeline.

Bibliothèque de traitement SageMaker

SageMaker Processing peut s'exécuter avec des cadres (par exemple, SKlearnProcessor, PySparkProcessor ou Hugging Face). Indépendamment du cadre utilisé, chaque TraitementStep nécessite ce qui suit :

  • Nom de l'étape – Le nom à utiliser pour votre étape de pipeline SageMaker
  • Arguments d'étape – Les arguments de votre ProcessingStep

De plus, vous pouvez fournir les éléments suivants :

  • La configuration de votre cache d'étape afin d'éviter des exécutions inutiles de votre étape dans un pipeline SageMaker
  • Une liste de noms d'étapes, d'instances d'étapes ou d'instances de collections d'étapes que ProcessingStep dépend
  • Le nom d'affichage du ProcessingStep
  • Une description du ProcessingStep
  • Fichiers de propriétés
  • Stratégies de nouvelle tentative

Les arguments sont remis au ProcessingStep. Vous pouvez utiliser le sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor class pour exécuter votre application Spark dans une tâche de traitement.

Chaque processeur a ses propres besoins, selon le framework. Ceci est mieux illustré en utilisant le PySparkProcessor, où vous pouvez transmettre des informations supplémentaires pour optimiser ProcessingStep en outre, par exemple via le configuration paramètre lors de l'exécution de votre travail.

Exécutez les tâches de traitement SageMaker dans un environnement sécurisé

Il s'agit d'avoir un lien direct avec le cœur des opérations de votre meilleures pratiques pour créer un Amazon VPC privé et le configurer afin que vos tâches ne soient pas accessibles sur l'Internet public. Les tâches de traitement SageMaker vous permettent de spécifier les sous-réseaux privés et les groupes de sécurité dans votre VPC, ainsi que d'activer l'isolation du réseau et le chiffrement du trafic inter-conteneurs à l'aide du NetworkConfig.VpcConfig paramètre de requête du CreateProcessingJob API. Nous donnons des exemples de cette configuration en utilisant le SDK SageMaker dans la section suivante.

Étape de traitement PySpark dans les pipelines SageMaker

Pour cet exemple, nous supposons que vous avez déployé Studio dans un environnement sécurisé déjà disponible, y compris VPC, points de terminaison VPC, groupes de sécurité, Gestion des identités et des accès AWS (IAM) rôles, et Service de gestion des clés AWS (AWS KMS). Nous supposons également que vous avez deux compartiments : un pour les artefacts tels que le code et les journaux, et un pour vos données. Le basic_infra.yaml le fichier fournit un exemple AWS CloudFormation code pour provisionner l'infrastructure prérequise nécessaire. L'exemple de code et le guide de déploiement sont également disponibles sur GitHub.

A titre d'exemple, nous mettons en place un pipeline contenant un seul ProcessingStep dans lequel nous lisons et écrivons simplement le ensemble de données abalone en utilisant Spark. Les exemples de code vous montrent comment installer et configurer le ProcessingStep.

Nous définissons des paramètres pour le pipeline (nom, rôle, compartiments, etc.) et des paramètres spécifiques à l'étape (type et nombre d'instances, version du framework, etc.). Dans cet exemple, nous utilisons une configuration sécurisée et définissons également des sous-réseaux, des groupes de sécurité et le chiffrement du trafic inter-conteneurs. Pour cet exemple, vous avez besoin d'un rôle d'exécution de pipeline avec un accès complet à SageMaker et un VPC. Voir le code suivant :

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Pour illustrer, l'exemple de code suivant exécute un script PySpark sur SageMaker Processing dans un pipeline en utilisant le PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Comme indiqué dans le code précédent, nous écrasons les configurations Spark par défaut en fournissant configuration.json en tant que ProcessingInput. Nous utilisons un configuration.json fichier enregistré dans Service de stockage simple Amazon (Amazon S3) avec les paramètres suivants :

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Nous pouvons mettre à jour la configuration par défaut de Spark soit en transmettant le fichier en tant que ProcessingInput ou en utilisant l'argument de configuration lors de l'exécution du run() la fonction.

La configuration de Spark dépend d'autres options, telles que le type d'instance et le nombre d'instances choisis pour la tâche de traitement. La première considération est le nombre d'instances, les cœurs vCPU de chacune de ces instances et la mémoire de l'instance. Vous pouvez utiliser Interfaces utilisateur Spark or Métriques d'instance CloudWatch et des journaux pour calibrer ces valeurs sur plusieurs itérations d'exécution.

De plus, les paramètres de l'exécuteur et du pilote peuvent être encore optimisés. Pour un exemple de calcul de ceux-ci, reportez-vous à Meilleures pratiques pour gérer avec succès la mémoire des applications Apache Spark sur Amazon EMR.

Ensuite, pour les paramètres du pilote et de l'exécuteur, nous vous recommandons d'examiner les paramètres du committer afin d'améliorer les performances lors de l'écriture sur Amazon S3. Dans notre cas, nous écrivons des fichiers Parquet sur Amazon S3 et définissons "spark.sql.parquet.fs.optimized.comitter.optimization-enabled” à vrai.

Si nécessaire pour une connexion à Amazon S3, un point de terminaison régional "spark.hadoop.fs.s3a.endpoint” peut être spécifié dans le fichier de configuration.

Dans cet exemple de pipeline, le script PySpark spark_process.py (comme illustré dans le code suivant) charge un fichier CSV d'Amazon S3 dans une trame de données Spark et enregistre les données en tant que Parquet dans Amazon S3.

Notez que notre exemple de configuration n'est pas proportionnel à la charge de travail car la lecture et l'écriture de l'ensemble de données abalone peuvent être effectuées sur les paramètres par défaut sur une instance. Les configurations que nous avons mentionnées doivent être définies en fonction de vos besoins spécifiques.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Pour plonger dans l'optimisation des tâches de traitement Spark, vous pouvez utiliser les journaux CloudWatch ainsi que l'interface utilisateur Spark. Vous pouvez créer l'interface utilisateur Spark en exécutant une tâche de traitement sur une instance de bloc-notes SageMaker. Vous pouvez visualiser le Interface utilisateur Spark pour les tâches de traitement exécutées dans un pipeline by exécuter le serveur d'historique dans une instance de bloc-notes SageMaker si les journaux de l'interface utilisateur Spark ont ​​été enregistrés dans le même emplacement Amazon S3.

Nettoyer

Si vous avez suivi le didacticiel, il est recommandé de supprimer les ressources qui ne sont plus utilisées pour ne plus générer de frais. Assurez-vous de supprimer la pile CloudFormation que vous avez utilisé pour créer vos ressources. Cela supprimera la pile créée ainsi que les ressources qu'elle a créées.

Conclusion

Dans cet article, nous avons montré comment exécuter une tâche de traitement SageMaker sécurisée à l'aide de PySpark dans SageMaker Pipelines. Nous avons également montré comment optimiser PySpark à l'aide des configurations Spark et configurer votre tâche de traitement pour qu'elle s'exécute dans une configuration réseau sécurisée.

Dans une prochaine étape, découvrez comment automatiser l'ensemble du cycle de vie du modèle et comment les clients ont construit des plateformes MLOps sécurisées et évolutives à l'aide des services SageMaker.


À propos des auteurs

Exécutez des tâches de traitement sécurisées à l'aide de PySpark dans Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Maren Suilmann est Data Scientist chez Services professionnels AWS. Elle travaille avec des clients de tous les secteurs pour dévoiler la puissance de l'IA/ML pour atteindre leurs résultats commerciaux. Maren est chez AWS depuis novembre 2019. Dans ses temps libres, elle aime le kickboxing, la randonnée vers de superbes vues et les soirées jeux de société.


Exécutez des tâches de traitement sécurisées à l'aide de PySpark dans Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Maira Ladeira Tanké
est spécialiste ML chez AWS. Avec une formation en science des données, elle a 9 ans d'expérience dans l'architecture et la création d'applications ML avec des clients de tous les secteurs. En tant que responsable technique, elle aide les clients à accélérer la réalisation de leur valeur commerciale grâce aux technologies émergentes et aux solutions innovantes. Dans ses temps libres, Maira aime voyager et passer du temps avec sa famille dans un endroit chaleureux.


Exécutez des tâches de traitement sécurisées à l'aide de PySpark dans Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Pauline Ting
est Data Scientist dans le Services professionnels AWS équipe. Elle aide les clients à atteindre et à accélérer leurs résultats commerciaux en développant des solutions d'IA/ML. Dans ses temps libres, Pauline aime voyager, surfer et essayer de nouveaux desserts.


Exécutez des tâches de traitement sécurisées à l'aide de PySpark dans Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Recherche verticale. Aï.Donald Fossouo
est un architecte de données senior dans le Services professionnels AWS équipe, travaillant principalement avec Global Finance Service. Il collabore avec les clients pour créer des solutions innovantes qui répondent aux problèmes commerciaux des clients et accélèrent l'adoption des services AWS. Dans ses temps libres, Donald aime lire, courir et voyager.

Horodatage:

Plus de Apprentissage automatique AWS