Rulați lucrări de procesare securizate folosind PySpark în Amazon SageMaker Pipelines

Rulați lucrări de procesare securizate folosind PySpark în Amazon SageMaker Pipelines

Amazon SageMaker Studio vă poate ajuta să construiți, să antrenezi, să depanați, să implementați și să vă monitorizați modelele și să vă gestionați fluxurile de lucru de învățare automată (ML). Pipelines Amazon SageMaker vă permite să construiți o platformă MLOps sigură, scalabilă și flexibilă în cadrul Studioului.

În această postare, explicăm cum să rulați joburi de procesare PySpark într-o conductă. Acest lucru permite oricui care dorește să antreneze un model folosind Pipelines să preproceseze și datele de antrenament, să postproceseze datele de inferență sau să evalueze modele folosind PySpark. Această capacitate este relevantă în special atunci când trebuie să procesați date la scară largă. În plus, vă prezentăm cum să vă optimizați pașii PySpark folosind configurații și jurnalele Spark UI.

Conducte este o Amazon SageMaker instrument pentru construirea și gestionarea conductelor ML end-to-end. Este un serviciu la cerere complet gestionat, integrat cu SageMaker și alte servicii AWS și, prin urmare, creează și gestionează resurse pentru dvs. Acest lucru asigură că instanțele sunt furnizate și utilizate numai atunci când rulează conductele. În plus, Pipelines este susținută de SageMaker Python SDK, permițându-vă să urmăriți dvs linia de date și pași de reutilizare prin memorarea lor în cache pentru a ușura timpul și costul de dezvoltare. Se poate folosi o conductă SageMaker etapele de prelucrare pentru a prelucra date sau a efectua evaluarea modelului.

Atunci când procesează date la scară largă, oamenii de știință de date și inginerii ML folosesc adesea PySpark, o interfață pentru Apache Spark în Python. SageMaker oferă imagini Docker prefabricate care includ PySpark și alte dependențe necesare pentru a rula lucrări distribuite de procesare a datelor, inclusiv transformări de date și inginerie de caracteristici folosind cadrul Spark. Deși acele imagini vă permit să începeți rapid să utilizați PySpark în lucrările de procesare, procesarea datelor la scară largă necesită adesea configurații specifice Spark pentru a optimiza calculul distribuit al cluster-ului creat de SageMaker.

În exemplul nostru, creăm o conductă SageMaker care rulează un singur pas de procesare. Pentru mai multe informații despre ce alți pași puteți adăuga la o conductă, consultați Etapele conductei.

Biblioteca de procesare SageMaker

SageMaker Processing poate rula cu anumite cadre (de exemplu, SKlearnProcessor, PySparkProcessor sau Hugging Face). Independent de cadrul utilizat, fiecare Pasul procesării necesită următoarele:

  • Numele pasului – Numele care va fi folosit pentru pasul conductei SageMaker
  • Argumente de pas – Argumentele pentru tine ProcessingStep

În plus, puteți furniza următoarele:

  • Configurația pentru cache-ul pasului pentru a evita rulările inutile ale pasului într-o conductă SageMaker
  • O listă de nume de pași, instanțe de pas sau instanțe de colecție de pași pe care ProcessingStep depinde
  • Numele afișat al ProcessingStep
  • O descriere a ProcessingStep
  • Fișiere de proprietate
  • Reîncercați politicile

Argumentele sunt predate celui ProcessingStep. Puteți utiliza sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor clasă pentru a rula aplicația Spark în interiorul unui job de procesare.

Fiecare procesor vine cu propriile nevoi, în funcție de cadru. Acest lucru este cel mai bine ilustrat folosind PySparkProcessor, unde puteți transmite informații suplimentare pentru a optimiza ProcessingStep mai departe, de exemplu prin intermediul configuration parametru atunci când rulați jobul.

Rulați lucrări SageMaker Processing într-un mediu securizat

este cele mai bune practici pentru a crea un Amazon VPC privat și a-l configura astfel încât locurile dvs. de muncă să nu fie accesibile prin internetul public. Lucrările de procesare SageMaker vă permit să specificați subrețelele private și grupurile de securitate din VPC-ul dvs., precum și să activați izolarea rețelei și criptarea traficului între containere folosind NetworkConfig.VpcConfig parametrul de solicitare al CreateProcessingJob API. Oferim exemple ale acestei configurații folosind SageMaker SDK în secțiunea următoare.

PySpark ProcessingStep din SageMaker Pipelines

Pentru acest exemplu, presupunem că aveți Studio implementat într-un mediu securizat deja disponibil, inclusiv VPC, puncte finale VPC, grupuri de securitate, Gestionarea identității și accesului AWS (IAM) roluri și AWS Service Management Service (AWS KMS). De asemenea, presupunem că aveți două compartimente: unul pentru artefacte precum codul și jurnalele și unul pentru datele dvs. The basic_infra.yaml fișierul oferă un exemplu Formarea AWS Cloud cod pentru a furniza infrastructura necesară. Exemplul de cod și ghidul de implementare sunt, de asemenea, disponibile pe GitHub.

De exemplu, am configurat o conductă care conține un singur ProcessingStep în care pur și simplu citim și scriem set de date abalone folosind Spark. Exemplele de cod vă arată cum să configurați și să configurați ProcessingStep.

Definim parametrii pentru conductă (nume, rol, compartimente și așa mai departe) și setări specifice etapelor (tipul și numărul de instanțe, versiunea cadru și așa mai departe). În acest exemplu, folosim o configurare sigură și definim, de asemenea, subrețele, grupurile de securitate și criptarea traficului între containere. Pentru acest exemplu, aveți nevoie de un rol de execuție pipeline cu acces complet SageMaker și un VPC. Vezi următorul cod:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Pentru a demonstra, următorul exemplu de cod rulează un script PySpark pe SageMaker Processing într-o conductă folosind PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

După cum se arată în codul precedent, suprascriem configurațiile implicite Spark furnizând configuration.json ca o ProcessingInput. Folosim a configuration.json fișier în care a fost salvat Serviciul Amazon de stocare simplă (Amazon S3) cu următoarele setări:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Putem actualiza configurația implicită Spark fie prin trecerea fișierului ca a ProcessingInput sau folosind argumentul de configurare când rulează run() Funcția.

Configurația Spark depinde de alte opțiuni, cum ar fi tipul instanței și numărul de instanțe alese pentru jobul de procesare. Prima considerație este numărul de instanțe, nucleele vCPU pe care le are fiecare dintre acele instanțe și memoria instanței. Poți să folosești Spark UI-uri or Valori ale instanței CloudWatch și înregistrează pentru a calibra aceste valori pe mai multe iterații de rulare.

În plus, setările pentru executor și driver pot fi optimizate și mai mult. Pentru un exemplu de calcul al acestora, consultați Cele mai bune practici pentru gestionarea cu succes a memoriei pentru aplicațiile Apache Spark pe Amazon EMR.

În continuare, pentru setările driverului și executorului, vă recomandăm să investigați setările committerului pentru a îmbunătăți performanța atunci când scrieți pe Amazon S3. În cazul nostru, scriem fișiere Parquet pe Amazon S3 și setăm „spark.sql.parquet.fs.optimized.comitter.optimization-enabled”La adevărat.

Dacă este necesar pentru o conexiune la Amazon S3, un punct final regional „spark.hadoop.fs.s3a.endpoint” poate fi specificat în fișierul de configurare.

În acest exemplu de conductă, scriptul PySpark spark_process.py (după cum se arată în codul următor) încarcă un fișier CSV din Amazon S3 într-un cadru de date Spark și salvează datele ca Parquet înapoi în Amazon S3.

Rețineți că exemplul nostru de configurație nu este proporțional cu volumul de lucru, deoarece citirea și scrierea setului de date abalone ar putea fi efectuată cu setările implicite pe o singură instanță. Configurațiile pe care le-am menționat ar trebui să fie definite în funcție de nevoile dumneavoastră specifice.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Pentru a vă scufunda în optimizarea lucrărilor de procesare Spark, puteți utiliza jurnalele CloudWatch, precum și interfața de utilizare Spark. Puteți crea interfața de utilizare Spark rulând o lucrare de procesare pe o instanță de notebook SageMaker. Puteți vizualiza Spark UI pentru joburile de procesare care rulează într-o conductă by rulează serverul de istoric într-o instanță de notebook SageMaker dacă jurnalele Spark UI au fost salvate în aceeași locație Amazon S3.

A curăța

Dacă ați urmat tutorialul, este o practică bună să ștergeți resursele care nu mai sunt folosite pentru a nu mai suporta costuri. Asigura-te ca ștergeți stiva CloudFormation pe care le-ați folosit pentru a vă crea resursele. Aceasta va șterge stiva creată, precum și resursele pe care le-a creat.

Concluzie

În această postare, am arătat cum să rulați o lucrare de procesare SageMaker securizată folosind PySpark în cadrul SageMaker Pipelines. De asemenea, am demonstrat cum să optimizați PySpark utilizând configurațiile Spark și să configurați jobul Procesare pentru a rula într-o configurație de rețea sigură.

Ca pas următor, explorați cum să automatizați întregul ciclu de viață al modelului și cum clienții au construit platforme MLOps sigure și scalabile folosind serviciile SageMaker.


Despre Autori

Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maren Suilmann este Data Scientist la Servicii profesionale AWS. Ea lucrează cu clienți din diverse industrii, dezvăluind puterea AI/ML pentru a-și atinge rezultatele de afaceri. Maren lucrează la AWS din noiembrie 2019. În timpul liber, îi place să facă kickboxing, să facă drumeții la priveliști minunate și nopți cu jocuri de societate.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maira Ladeira Tanke
este specialist ML la AWS. Cu experiență în știința datelor, ea are 9 ani de experiență în arhitectura și construirea de aplicații ML cu clienți din diverse industrii. În calitate de lider tehnic, ea ajută clienții să-și accelereze atingerea valorii afacerii prin tehnologii emergente și soluții inovatoare. În timpul liber, Mairei îi place să călătorească și să petreacă timpul cu familia ei într-un loc cald.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Pauline Ting
este Data Scientist în Servicii profesionale AWS echipă. Ea sprijină clienții în atingerea și accelerarea rezultatelor lor de afaceri prin dezvoltarea de soluții AI/ML. În timpul liber, Paulinei îi place să călătorească, să facă surf și să încerce noi locuri de desert.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Donald Fossouo
este arhitect senior de date în Servicii profesionale AWS echipa, care lucrează în mare parte cu Global Finance Service. El se angajează cu clienții pentru a crea soluții inovatoare care abordează problemele de afaceri ale clienților și accelerează adoptarea serviciilor AWS. În timpul liber, lui Donald îi place să citească, să alerge și să călătorească.

Timestamp-ul:

Mai mult de la Învățare automată AWS