Voer beveiligde verwerkingstaken uit met PySpark in Amazon SageMaker Pipelines

Voer beveiligde verwerkingstaken uit met PySpark in Amazon SageMaker Pipelines

Amazon SageMaker Studio kan u helpen bij het bouwen, trainen, debuggen, implementeren en bewaken van uw modellen en het beheren van uw machine learning (ML)-workflows. Amazon SageMaker-pijpleidingen stelt u in staat om een veilig, schaalbaar en flexibel MLOps-platform binnen Studio.

In dit bericht leggen we uit hoe u PySpark-verwerkingstaken binnen een pijplijn kunt uitvoeren. Dit stelt iedereen die een model wil trainen met behulp van Pipelines in staat om ook trainingsgegevens voor te verwerken, gevolgtrekkingsgegevens na te verwerken of modellen te evalueren met behulp van PySpark. Deze mogelijkheid is vooral relevant wanneer u grootschalige gegevens moet verwerken. Daarnaast laten we zien hoe u uw PySpark-stappen kunt optimaliseren met behulp van configuraties en Spark UI-logboeken.

Pijpleidingen is een Amazon Sage Maker tool voor het bouwen en beheren van end-to-end ML-pijplijnen. Het is een volledig beheerde on-demand service, geïntegreerd met SageMaker en andere AWS-services, en creëert en beheert daarom bronnen voor u. Dit zorgt ervoor dat instanties alleen worden ingericht en gebruikt bij het uitvoeren van de pijplijnen. Bovendien wordt Pipelines ondersteund door de SageMaker Python-SDK, zodat u uw datalijn en stappen hergebruiken door ze in de cache op te slaan om ontwikkelingstijd en -kosten te verminderen. Een SageMaker-pijplijn kan gebruiken verwerkingsstappen om gegevens te verwerken of modelevaluatie uit te voeren.

Bij het verwerken van grootschalige gegevens gebruiken datawetenschappers en ML-ingenieurs vaak PySpark, een interface voor Apache Spark in Python. SageMaker biedt kant-en-klare Docker-images die PySpark en andere afhankelijkheden bevatten die nodig zijn om gedistribueerde gegevensverwerkingstaken uit te voeren, inclusief gegevenstransformaties en functie-engineering met behulp van het Spark-framework. Hoewel u met deze afbeeldingen PySpark snel kunt gaan gebruiken bij het verwerken van taken, vereist grootschalige gegevensverwerking vaak specifieke Spark-configuraties om de gedistribueerde computerverwerking van het door SageMaker gecreëerde cluster te optimaliseren.

In ons voorbeeld maken we een SageMaker-pijplijn met een enkele verwerkingsstap. Raadpleeg voor meer informatie over welke andere stappen u aan een pijplijn kunt toevoegen Pijplijn stappen.

SageMaker Processing-bibliotheek

SageMaker Processing kan worden uitgevoerd met specifieke frameworks (bijvoorbeeld SKlearnProcessor, PySparkProcessor of Hugging Face). Onafhankelijk van het gebruikte raamwerk, elk Verwerking Stap vereist het volgende:

  • Stap naam – De naam die moet worden gebruikt voor uw SageMaker-pijplijnstap
  • Stap argumenten – De argumenten voor uw ProcessingStep

Daarnaast kunt u het volgende verstrekken:

  • De configuratie voor uw step-cache om onnodige runs van uw step in een SageMaker-pijplijn te voorkomen
  • Een lijst met stapnamen, stapexemplaren of stepverzamelingsinstanties die de ProcessingStep hangt af van
  • De weergavenaam van de ProcessingStep
  • Een beschrijving van de ProcessingStep
  • Eigendom bestanden
  • Beleid voor opnieuw proberen

De argumenten worden overgedragen aan de ProcessingStep. Je kunt de sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor class om uw Spark-toepassing uit te voeren binnen een verwerkingstaak.

Elke processor heeft zijn eigen behoeften, afhankelijk van het framework. Dit wordt het beste geïllustreerd aan de hand van de PySparkProcessor, waar u aanvullende informatie kunt doorgeven om de ProcessingStep verder, bijvoorbeeld via de configuration parameter bij het uitvoeren van uw taak.

Voer SageMaker Processing-taken uit in een beveiligde omgeving

Het is best practice om een ​​privé Amazon VPC te maken en deze zo te configureren dat uw taken niet toegankelijk zijn via het openbare internet. Met SageMaker Processing-taken kunt u de privésubnetten en beveiligingsgroepen in uw VPC specificeren en netwerkisolatie en intercontainerverkeerscodering inschakelen met behulp van de NetworkConfig.VpcConfig verzoekparameter van de CreateProcessingJob API. We geven voorbeelden van deze configuratie met behulp van de SageMaker-SDK in de volgende sectie.

PySpark ProcessingStep binnen SageMaker Pipelines

Voor dit voorbeeld gaan we ervan uit dat u Studio hebt geïmplementeerd in een beveiligde omgeving die al beschikbaar is, inclusief VPC, VPC-eindpunten, beveiligingsgroepen, AWS Identiteits- en toegangsbeheer (IAM) rollen, en AWS Sleutelbeheerservice (AWS KMS) toetsen. We gaan er ook van uit dat u twee buckets heeft: één voor artefacten zoals code en logboeken, en één voor uw gegevens. De basis_infra.yaml bestand geeft een voorbeeld AWS CloudFormatie code om de noodzakelijke vereiste infrastructuur in te richten. De voorbeeldcode en implementatiegids is ook beschikbaar op GitHub.

Als voorbeeld hebben we een pijplijn opgezet met daarin een single ProcessingStep waarin we gewoon lezen en schrijven de abalone-gegevensset Spark gebruiken. De codevoorbeelden laten zien hoe u het ProcessingStep.

We definiëren parameters voor de pijplijn (naam, rol, buckets, enzovoort) en stapspecifieke instellingen (instancetype en aantal, frameworkversie, enzovoort). In dit voorbeeld gebruiken we een veilige configuratie en definiëren we ook subnetten, beveiligingsgroepen en de versleuteling van het intercontainerverkeer. Voor dit voorbeeld hebt u een pijplijnuitvoeringsrol nodig met volledige toegang tot SageMaker en een VPC. Zie de volgende code:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Ter demonstratie voert het volgende codevoorbeeld een PySpark-script uit op SageMaker Processing binnen een pijplijn met behulp van de PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Zoals te zien is in de voorgaande code, overschrijven we de standaard Spark-configuraties door op te geven configuration.json een ProcessingInput​ We gebruiken een configuration.json bestand waarin is opgeslagen Amazon eenvoudige opslagservice (Amazon S3) met de volgende instellingen:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

We kunnen de standaard Spark-configuratie bijwerken door het bestand door te geven als een ProcessingInput of door het configuratieargument te gebruiken bij het uitvoeren van het run() functie.

De Spark-configuratie is afhankelijk van andere opties, zoals het exemplaartype en het aantal exemplaren dat is gekozen voor de verwerkingstaak. De eerste overweging is het aantal instanties, de vCPU-kernen die elk van die instanties hebben en het instantiegeheugen. Je kunt gebruiken Spark-gebruikersinterfaces or Statistieken van CloudWatch-instanties en logboeken om deze waarden te kalibreren over meerdere run-iteraties.

Daarnaast kunnen de instellingen van de uitvoerder en het stuurprogramma nog verder worden geoptimaliseerd. Zie voor een voorbeeld van hoe u deze kunt berekenen Best practices voor het succesvol beheren van geheugen voor Apache Spark-applicaties op Amazon EMR.

Vervolgens raden we aan voor stuurprogramma- en uitvoerderinstellingen de instellingen van de committer te onderzoeken om de prestaties te verbeteren bij het schrijven naar Amazon S3. In ons geval schrijven we Parquet-bestanden naar Amazon S3 en stellen we "spark.sql.parquet.fs.optimized.comitter.optimization-enabled”Naar true.

Indien nodig voor een verbinding met Amazon S3, een regionaal eindpunt "spark.hadoop.fs.s3a.endpoint” kan worden opgegeven in het configuratiebestand.

In deze voorbeeldpijplijn is het PySpark-script spark_process.py (zoals weergegeven in de volgende code) laadt een CSV-bestand van Amazon S3 in een Spark-dataframe en slaat de gegevens op als Parquet terug naar Amazon S3.

Houd er rekening mee dat onze voorbeeldconfiguratie niet in verhouding staat tot de werklast, omdat het lezen en schrijven van de abalone dataset kan worden gedaan met standaardinstellingen op één exemplaar. De configuraties die we hebben genoemd, moeten worden gedefinieerd op basis van uw specifieke behoeften.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Als u zich wilt verdiepen in het optimaliseren van Spark-verwerkingstaken, kunt u zowel de CloudWatch-logboeken als de Spark-gebruikersinterface gebruiken. U kunt de Spark-gebruikersinterface maken door een verwerkingstaak uit te voeren op een SageMaker-notebookinstantie. U kunt de Spark-gebruikersinterface voor de verwerkingstaken die binnen een pijplijn worden uitgevoerd by het uitvoeren van de geschiedenisserver binnen een SageMaker-notebookinstantie als de Spark UI-logboeken zijn opgeslagen op dezelfde Amazon S3-locatie.

Opruimen

Als u de zelfstudie hebt gevolgd, is het een goede gewoonte om bronnen te verwijderen die niet langer worden gebruikt om te voorkomen dat er kosten in rekening worden gebracht. Zorg ervoor dat verwijder de CloudFormation-stack die u gebruikte om uw bronnen te maken. Hiermee wordt de gemaakte stapel verwijderd, evenals de bronnen die deze heeft gemaakt.

Conclusie

In dit bericht hebben we laten zien hoe u een veilige SageMaker Processing-taak uitvoert met PySpark binnen SageMaker Pipelines. We hebben ook gedemonstreerd hoe u PySpark kunt optimaliseren met behulp van Spark-configuraties en hoe u uw Processing-taak kunt instellen om te worden uitgevoerd in een veilige netwerkconfiguratie.

Onderzoek als volgende stap hoe u de volledige levenscyclus van het model kunt automatiseren en hoe klanten bouwden veilige en schaalbare MLOps-platforms met behulp van SageMaker-services.


Over de auteurs

Voer veilige verwerkingstaken uit met PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Maren Suilman is een Data Scientist bij AWS professionele services. Ze werkt samen met klanten in verschillende sectoren en onthult de kracht van AI/ML om hun bedrijfsresultaten te bereiken. Maren werkt sinds november 2019 bij AWS. In haar vrije tijd houdt ze van kickboksen, wandelen naar geweldige uitzichten en bordspelavonden.


Voer veilige verwerkingstaken uit met PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Maira Ladeira Tanke
is ML-specialist bij AWS. Met een achtergrond in datawetenschap heeft ze 9 jaar ervaring met het ontwerpen en bouwen van ML-applicaties bij klanten in verschillende sectoren. Als technisch leider helpt ze klanten hun zakelijke waarde sneller te realiseren door middel van opkomende technologieën en innovatieve oplossingen. In haar vrije tijd houdt Maira ervan om te reizen en tijd door te brengen met haar familie op een warme plek.


Voer veilige verwerkingstaken uit met PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Pauline Ting
is datawetenschapper in de AWS professionele services team. Ze ondersteunt klanten bij het behalen en versnellen van hun bedrijfsresultaten door AI/ML-oplossingen te ontwikkelen. In haar vrije tijd houdt Pauline van reizen, surfen en nieuwe toetjes uitproberen.


Voer veilige verwerkingstaken uit met PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Verticaal zoeken. Ai.Donald Fossouo
is Sr Data Architect in de AWS professionele services team, meestal in samenwerking met Global Finance Service. Hij werkt samen met klanten om innovatieve oplossingen te creëren die zakelijke problemen van klanten aanpakken en de acceptatie van AWS-services versnellen. In zijn vrije tijd houdt Donald van lezen, hardlopen en reizen.

Tijdstempel:

Meer van AWS-machine learning