Esegui processi di elaborazione sicuri utilizzando PySpark in Amazon SageMaker Pipelines

Esegui processi di elaborazione sicuri utilizzando PySpark in Amazon SageMaker Pipelines

Amazon Sage Maker Studio può aiutarti a creare, addestrare, eseguire il debug, distribuire e monitorare i tuoi modelli e gestire i tuoi flussi di lavoro di machine learning (ML). Pipeline di Amazon SageMaker consente di costruire un piattaforma MLOps sicura, scalabile e flessibile all'interno dello Studio.

In questo post, spieghiamo come eseguire processi di elaborazione PySpark all'interno di una pipeline. Ciò consente a chiunque desideri addestrare un modello utilizzando Pipelines di pre-elaborare anche i dati di addestramento, post-elaborare i dati di inferenza o valutare i modelli utilizzando PySpark. Questa funzionalità è particolarmente rilevante quando è necessario elaborare dati su larga scala. Inoltre, mostriamo come ottimizzare i passaggi di PySpark utilizzando le configurazioni e i log dell'interfaccia utente di Spark.

Pipeline è un Amazon Sage Maker strumento per la creazione e la gestione di pipeline ML end-to-end. È un servizio on demand completamente gestito, integrato con SageMaker e altri servizi AWS, e quindi crea e gestisce le risorse per te. Ciò garantisce che le istanze vengano fornite e utilizzate solo durante l'esecuzione delle pipeline. Inoltre, Pipelines è supportato dal SDK Python di SageMaker, permettendoti di monitorare il tuo lignaggio di dati ed riutilizzare i passaggi memorizzandoli nella cache per ridurre tempi e costi di sviluppo. Una pipeline SageMaker può utilizzare fasi di lavorazione per elaborare i dati o eseguire la valutazione del modello.

Quando si elaborano dati su larga scala, i data scientist e gli ingegneri ML usano spesso PySpark, un'interfaccia per Apache Spark in Python. SageMaker fornisce immagini Docker predefinite che includono PySpark e altre dipendenze necessarie per eseguire processi di elaborazione dei dati distribuiti, comprese le trasformazioni dei dati e l'ingegnerizzazione delle funzionalità utilizzando il framework Spark. Sebbene tali immagini consentano di iniziare rapidamente a utilizzare PySpark nei processi di elaborazione, l'elaborazione dei dati su larga scala richiede spesso configurazioni Spark specifiche per ottimizzare l'elaborazione distribuita del cluster creato da SageMaker.

Nel nostro esempio, creiamo una pipeline SageMaker che esegue una singola fase di elaborazione. Per ulteriori informazioni su quali altri passaggi è possibile aggiungere a una pipeline, fare riferimento a Passaggi della pipeline.

Libreria di elaborazione SageMaker

SageMaker Processing può essere eseguito con specifiche quadri (ad esempio, SKlearnProcessor, PySparkProcessor o Hugging Face). Indipendentemente dal framework utilizzato, ciascuno Fase di elaborazione richiede quanto segue:

  • Nome del passo – Il nome da utilizzare per il passaggio della pipeline SageMaker
  • Argomenti passo – Gli argomenti per il tuo ProcessingStep

Inoltre, puoi fornire quanto segue:

  • La configurazione per la tua cache dei passaggi per evitare esecuzioni non necessarie del tuo passaggio in una pipeline SageMaker
  • Un elenco di nomi di passaggi, istanze di passaggi o istanze di raccolte di passaggi che il file ProcessingStep dipende
  • Il nome visualizzato del ProcessingStep
  • Una descrizione del ProcessingStep
  • File di proprietà
  • Riprova criteri

Gli argomenti sono consegnati al ProcessingStep. Puoi usare il sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor class per eseguire l'applicazione Spark all'interno di un processo di elaborazione.

Ogni processore ha le proprie esigenze, a seconda del framework. Questo è meglio illustrato utilizzando il PySparkProcessor, dove puoi passare informazioni aggiuntive per ottimizzare il file ProcessingStep inoltre, ad esempio tramite il configuration parametro durante l'esecuzione del lavoro.

Esegui i lavori di SageMaker Processing in un ambiente sicuro

Suo le migliori pratiche per creare un Amazon VPC privato e configurarlo in modo che i tuoi processi non siano accessibili tramite Internet pubblico. I job SageMaker Processing consentono di specificare le sottoreti private e i gruppi di sicurezza nel VPC, nonché di abilitare l'isolamento della rete e la crittografia del traffico tra container utilizzando il NetworkConfig.VpcConfig parametro di richiesta del CreateProcessingJob API. Forniamo esempi di questa configurazione utilizzando il file SDK di SageMaker nella prossima sezione.

PySpark ProcessingStep all'interno delle pipeline SageMaker

Per questo esempio, supponiamo che tu abbia distribuito Studio in un ambiente sicuro già disponibile, inclusi VPC, endpoint VPC, gruppi di sicurezza, Gestione dell'identità e dell'accesso di AWS (IAM) ruoli e Servizio di gestione delle chiavi AWS (AWS KMS). Supponiamo inoltre che tu abbia due bucket: uno per artefatti come codice e log e uno per i tuoi dati. IL basic_infra.yaml file fornisce un esempio AWS CloudFormazione codice per eseguire il provisioning dell'infrastruttura prerequisita necessaria. Il codice di esempio e la guida alla distribuzione sono disponibili anche su GitHub.

Ad esempio, impostiamo una pipeline contenente un singolo ProcessingStep in cui stiamo semplicemente leggendo e scrivendo il set di dati abalone usando Scintilla. Gli esempi di codice mostrano come impostare e configurare il file ProcessingStep.

Definiamo i parametri per la pipeline (nome, ruolo, bucket e così via) e le impostazioni specifiche del passaggio (tipo e numero di istanze, versione del framework e così via). In questo esempio, utilizziamo una configurazione sicura e definiamo anche sottoreti, gruppi di sicurezza e crittografia del traffico tra container. Per questo esempio, è necessario un ruolo di esecuzione della pipeline con accesso completo a SageMaker e un VPC. Vedere il seguente codice:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Per dimostrare, il seguente esempio di codice esegue uno script PySpark su SageMaker Processing all'interno di una pipeline utilizzando il PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Come mostrato nel codice precedente, stiamo sovrascrivendo le configurazioni Spark predefinite fornendo configuration.json come ProcessingInput. Usiamo un file configuration.json file in cui è stato salvato Servizio di archiviazione semplice Amazon (Amazon S3) con le seguenti impostazioni:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Possiamo aggiornare la configurazione Spark predefinita passando il file come a ProcessingInput o utilizzando l'argomento di configurazione durante l'esecuzione di run() funzione.

La configurazione di Spark dipende da altre opzioni, come il tipo di istanza e il numero di istanze scelto per il processo di elaborazione. La prima considerazione è il numero di istanze, i core vCPU di ciascuna di tali istanze e la memoria dell'istanza. Puoi usare Interfaccia utente Spark or Parametri dell'istanza CloudWatch e registra per calibrare questi valori su più iterazioni di esecuzione.

Inoltre, le impostazioni dell'esecutore e del driver possono essere ulteriormente ottimizzate. Per un esempio di come calcolarli, fare riferimento a Best practice per gestire correttamente la memoria per le applicazioni Apache Spark su Amazon EMR.

Successivamente, per le impostazioni del driver e dell'executor, consigliamo di esaminare le impostazioni del committer per migliorare le prestazioni durante la scrittura su Amazon S3. Nel nostro caso, stiamo scrivendo i file Parquet su Amazon S3 e impostando "spark.sql.parquet.fs.optimized.comitter.optimization-enabled"A vero.

Se necessario per una connessione ad Amazon S3, un endpoint regionale "spark.hadoop.fs.s3a.endpoint” può essere specificato all'interno del file di configurazione.

In questa pipeline di esempio, lo script PySpark spark_process.py (come mostrato nel codice seguente) carica un file CSV da Amazon S3 in un frame di dati Spark e salva i dati come Parquet in Amazon S3.

Si noti che la nostra configurazione di esempio non è proporzionata al carico di lavoro perché la lettura e la scrittura del set di dati abalone potrebbe essere eseguita con le impostazioni predefinite su un'istanza. Le configurazioni che abbiamo menzionato dovrebbero essere definite in base alle tue esigenze specifiche.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Per approfondire l'ottimizzazione dei processi di elaborazione Spark, puoi utilizzare i log di CloudWatch e l'interfaccia utente di Spark. Puoi creare l'interfaccia utente Spark eseguendo un processo di elaborazione su un'istanza notebook SageMaker. Puoi visualizzare il Interfaccia utente Spark per i processi di elaborazione in esecuzione all'interno di una pipeline by eseguire il server della cronologia all'interno di un'istanza notebook SageMaker se i log dell'interfaccia utente Spark sono stati salvati nella stessa posizione Amazon S3.

ripulire

Se hai seguito l'esercitazione, è consigliabile eliminare le risorse che non vengono più utilizzate per evitare di incorrere in addebiti. Assicurati che eliminare lo stack di CloudFormation che hai utilizzato per creare le tue risorse. Ciò eliminerà lo stack creato e le risorse che ha creato.

Conclusione

In questo post, abbiamo mostrato come eseguire un processo di elaborazione SageMaker sicuro utilizzando PySpark all'interno di SageMaker Pipelines. Abbiamo anche dimostrato come ottimizzare PySpark utilizzando le configurazioni Spark e impostare il processo di elaborazione in modo che venga eseguito in una configurazione di rete sicura.

Come passaggio successivo, esplora come automatizzare l'intero ciclo di vita del modello e come i clienti hanno creato piattaforme MLOps sicure e scalabili utilizzando i servizi SageMaker.


Informazioni sugli autori

Esegui processi di elaborazione sicuri utilizzando PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Maren Suilmann è un Data Scientist presso Servizi professionali AWS. Lavora con clienti di tutti i settori svelando il potere dell'AI/ML per raggiungere i loro risultati di business. Maren lavora con AWS da novembre 2019. Nel tempo libero si diverte a fare kickboxing, fare escursioni tra splendidi panorami e serate di giochi da tavolo.


Esegui processi di elaborazione sicuri utilizzando PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Maira Ladeira Tanke
è uno specialista di ML presso AWS. Con un background nella scienza dei dati, ha 9 anni di esperienza nell'architettura e nella creazione di applicazioni ML con clienti di tutti i settori. In qualità di responsabile tecnico, aiuta i clienti ad accelerare il raggiungimento del valore aziendale attraverso tecnologie emergenti e soluzioni innovative. Nel tempo libero, Maira ama viaggiare e trascorrere del tempo con la sua famiglia in un luogo caldo.


Esegui processi di elaborazione sicuri utilizzando PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Paolino Ting
è Data Scientist nel Servizi professionali AWS squadra. Supporta i clienti nel raggiungimento e nell'accelerazione dei loro risultati di business sviluppando soluzioni AI/ML. Nel suo tempo libero, a Pauline piace viaggiare, fare surf e provare nuovi dolci.


Esegui processi di elaborazione sicuri utilizzando PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Ricerca verticale. Ai.Donald Fossouo
è Sr Data Architect nel Servizi professionali AWS team, lavorando principalmente con Global Finance Service. Si impegna con i clienti per creare soluzioni innovative che risolvano i problemi aziendali dei clienti e accelerino l'adozione dei servizi AWS. Nel tempo libero, a Paperino piace leggere, correre e viaggiare.

Timestamp:

Di più da Apprendimento automatico di AWS