Kör säkra bearbetningsjobb med PySpark i Amazon SageMaker Pipelines

Kör säkra bearbetningsjobb med PySpark i Amazon SageMaker Pipelines

Amazon SageMaker Studio kan hjälpa dig att bygga, träna, felsöka, distribuera och övervaka dina modeller och hantera dina arbetsflöden för maskininlärning (ML). Amazon SageMaker-rörledningar gör det möjligt för dig att bygga en säker, skalbar och flexibel MLOps-plattform i Studio.

I det här inlägget förklarar vi hur man kör PySpark-bearbetningsjobb inom en pipeline. Detta gör det möjligt för alla som vill träna en modell med Pipelines att också förbehandla träningsdata, efterbehandla slutledningsdata eller utvärdera modeller med PySpark. Denna förmåga är särskilt relevant när du behöver bearbeta storskalig data. Dessutom visar vi hur du optimerar dina PySpark-steg med hjälp av konfigurationer och Spark UI-loggar.

Pipelines är en Amazon SageMaker verktyg för att bygga och hantera end-to-end ML-pipelines. Det är en helt hanterad on-demand-tjänst, integrerad med SageMaker och andra AWS-tjänster, och skapar och hanterar därför resurser åt dig. Detta säkerställer att instanser endast tillhandahålls och används när pipelines körs. Dessutom stöds Pipelines av SageMaker Python SDK, så att du kan spåra din data härstamning och återanvändningssteg genom att cachelagra dem för att underlätta utvecklingstid och kostnad. En SageMaker pipeline kan använda bearbetningssteg att bearbeta data eller utföra modellutvärdering.

Vid bearbetning av storskalig data använder datavetare och ML-ingenjörer ofta PySpark, ett gränssnitt för Apache Spark i Python. SageMaker tillhandahåller förbyggda Docker-bilder som inkluderar PySpark och andra beroenden som behövs för att köra distribuerade databearbetningsjobb, inklusive datatransformationer och funktionsteknik med Spark-ramverket. Även om dessa bilder gör att du snabbt kan börja använda PySpark i bearbetningsjobb, kräver storskalig databehandling ofta specifika Spark-konfigurationer för att optimera den distribuerade beräkningen av klustret som skapats av SageMaker.

I vårt exempel skapar vi en SageMaker-pipeline som kör ett enda bearbetningssteg. För mer information om vilka andra steg du kan lägga till i en pipeline, se Pipeline Steg.

SageMaker Processing-bibliotek

SageMaker Processing kan köras med specifika ramar (till exempel SKlearnProcessor, PySparkProcessor eller Hugging Face). Oberoende av vilket ramverk som används, var och en Bearbetning Steg kräver följande:

  • Stegets namn – Namnet som ska användas för ditt SageMaker pipelinesteg
  • Stegargument – Argumenten för din ProcessingStep

Dessutom kan du tillhandahålla följande:

  • Konfigurationen för din stegcache för att undvika onödiga körningar av ditt steg i en SageMaker-pipeline
  • En lista över stegnamn, steginstanser eller stegsamlingsinstanser som ProcessingStep beror på
  • Visningsnamnet för ProcessingStep
  • En beskrivning av ProcessingStep
  • Fastighetsfiler
  • Försök igen policyer

Argumenten överlämnas till ProcessingStep. Du kan använda sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor klass för att köra din Spark-applikation i ett bearbetningsjobb.

Varje processor kommer med sina egna behov, beroende på ramverket. Detta illustreras bäst med hjälp av PySparkProcessor, där du kan skicka ytterligare information för att optimera ProcessingStep vidare, till exempel via configuration parameter när du kör ditt jobb.

Kör SageMaker Processing-jobb i en säker miljö

Det är bästa praxis att skapa en privat Amazon VPC och konfigurera den så att dina jobb inte är tillgängliga över det offentliga internet. SageMaker Processing-jobb låter dig specificera de privata undernäten och säkerhetsgrupperna i din VPC samt aktivera nätverksisolering och kryptering av intercontainertrafik med hjälp av NetworkConfig.VpcConfig begäran parameter för CreateProcessingJob API. Vi ger exempel på denna konfiguration med hjälp av SageMaker SDK i nästa avsnitt.

PySpark ProcessingStep inom SageMaker Pipelines

För det här exemplet antar vi att du har Studio utplacerat i en säker miljö som redan är tillgänglig, inklusive VPC, VPC-slutpunkter, säkerhetsgrupper, AWS identitets- och åtkomsthantering (IAM) roller och AWS nyckelhanteringstjänst (AWS KMS)-tangenter. Vi antar också att du har två hinkar: en för artefakter som kod och loggar, och en för dina data. De basic_infra.yaml filen ger exempel AWS molnformation kod för att tillhandahålla den nödvändiga infrastrukturen. Exempelkoden och distributionsguiden finns också tillgänglig på GitHub.

Som ett exempel satte vi upp en pipeline som innehåller en singel ProcessingStep där vi helt enkelt läser och skriver abalone dataset använder Spark. Kodexemplen visar hur du ställer in och konfigurerar ProcessingStep.

Vi definierar parametrar för pipeline (namn, roll, buckets och så vidare) och stegspecifika inställningar (instanstyp och antal, ramverksversion och så vidare). I det här exemplet använder vi en säker installation och definierar även undernät, säkerhetsgrupper och krypteringen av intercontainertrafik. För det här exemplet behöver du en pipelineexekveringsroll med SageMaker full access och en VPC. Se följande kod:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

För att demonstrera, kör följande kodexempel ett PySpark-skript på SageMaker Processing inom en pipeline genom att använda PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Som visas i föregående kod skriver vi över standard Spark-konfigurationer genom att tillhandahålla configuration.json som en ProcessingInput. Vi använder en configuration.json fil som sparades i Amazon enkel lagringstjänst (Amazon S3) med följande inställningar:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Vi kan uppdatera standard Spark-konfigurationen antingen genom att skicka filen som en ProcessingInput eller genom att använda konfigurationsargumentet när du kör run() funktion.

Spark-konfigurationen är beroende av andra alternativ, som instanstypen och antalet instanser som valts för bearbetningsjobbet. Den första hänsynen är antalet instanser, vCPU-kärnorna som var och en av dessa instanser har och instansminnet. Du kan använda Spark UI or CloudWatch-instansmätvärden och loggar för att kalibrera dessa värden över flera körningar.

Dessutom kan executor- och drivrutinsinställningarna optimeras ytterligare. För ett exempel på hur man beräknar dessa, se Bästa metoder för att framgångsrikt hantera minne för Apache Spark-applikationer på Amazon EMR.

Därefter, för drivrutins- och exekveringsinställningar, rekommenderar vi att du undersöker committerinställningarna för att förbättra prestandan när du skriver till Amazon S3. I vårt fall skriver vi parkettfiler till Amazon S3 och ställer in "spark.sql.parquet.fs.optimized.comitter.optimization-enabled”Till sant.

Om det behövs för en anslutning till Amazon S3, en regional slutpunkt "spark.hadoop.fs.s3a.endpoint” kan anges i konfigurationsfilen.

I detta exempel pipeline, PySpark-skriptet spark_process.py (som visas i följande kod) laddar en CSV-fil från Amazon S3 till en Spark-dataram och sparar data som Parquet tillbaka till Amazon S3.

Observera att vår exempelkonfiguration inte står i proportion till arbetsbelastningen eftersom läsning och skrivning av abalone-datauppsättningen kan göras med standardinställningar på en instans. De konfigurationer vi nämnde bör definieras utifrån dina specifika behov.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

För att dyka in i att optimera Spark-bearbetningsjobb kan du använda CloudWatch-loggarna såväl som Spark-gränssnittet. Du kan skapa Spark-gränssnittet genom att köra ett bearbetningsjobb på en SageMaker-anteckningsbokförekomst. Du kan se Spark UI för bearbetningsjobb som körs inom en pipeline by kör historikservern i en SageMaker notebook-instans om Spark UI-loggarna sparades på samma Amazon S3-plats.

Städa upp

Om du följde handledningen är det bra att ta bort resurser som inte längre används för att sluta ta på dig avgifter. Se till att ta bort CloudFormation-stacken som du använde för att skapa dina resurser. Detta kommer att ta bort den skapade stacken såväl som resurserna den skapade.

Slutsats

I det här inlägget visade vi hur man kör ett säkert SageMaker Processing-jobb med PySpark inom SageMaker Pipelines. Vi visade också hur du optimerar PySpark med hjälp av Spark-konfigurationer och ställer in ditt bearbetningsjobb så att det körs i en säker nätverkskonfiguration.

Som ett nästa steg, utforska hur man automatiserar hela modellens livscykel och hur kunder byggde säkra och skalbara MLOps-plattformar använder SageMakers tjänster.


Om författarna

Kör säkra bearbetningsjobb med PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Maren Suilmann är Data Scientist på AWS professionella tjänster. Hon arbetar med kunder i olika branscher och avslöjar kraften hos AI/ML för att uppnå sina affärsresultat. Maren har varit med AWS sedan november 2019. På fritiden gillar hon kickboxning, vandring till fantastiska vyer och brädspelskvällar.


Kör säkra bearbetningsjobb med PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Maira Ladeira Tanke
är ML-specialist på AWS. Med en bakgrund inom datavetenskap har hon 9 års erfarenhet av att arkitektur och bygga ML-applikationer med kunder över branscher. Som teknisk ledare hjälper hon kunder att accelerera deras uppnående av affärsvärde genom framväxande teknologier och innovativa lösningar. På fritiden tycker Maira om att resa och umgås med sin familj på en varm plats.


Kör säkra bearbetningsjobb med PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Pauline Ting
är Data Scientist i AWS professionella tjänster team. Hon stödjer kunder i att uppnå och accelerera deras affärsresultat genom att utveckla AI/ML-lösningar. På fritiden tycker Pauline om att resa, surfa och prova nya dessertställen.


Kör säkra bearbetningsjobb med PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertikal sökning. Ai.Donald Fossouo
är Sr Data Architect i AWS professionella tjänster team som mestadels arbetar med Global Finance Service. Han engagerar sig med kunder för att skapa innovativa lösningar som tar itu med kundernas affärsproblem och påskyndar införandet av AWS-tjänster. På fritiden tycker Donald om att läsa, springa och resa.

Tidsstämpel:

Mer från AWS maskininlärning