Kør sikre behandlingsjob ved hjælp af PySpark i Amazon SageMaker Pipelines

Kør sikre behandlingsjob ved hjælp af PySpark i Amazon SageMaker Pipelines

Amazon SageMaker Studio kan hjælpe dig med at bygge, træne, fejlfinde, implementere og overvåge dine modeller og administrere dine maskinlærings- (ML) arbejdsgange. Amazon SageMaker Pipelines giver dig mulighed for at bygge en sikker, skalerbar og fleksibel MLOps-platform i studiet.

I dette indlæg forklarer vi, hvordan man kører PySpark-behandlingsjob inden for en pipeline. Dette gør det muligt for enhver, der ønsker at træne en model ved hjælp af Pipelines, også at forbehandle træningsdata, postproces-inferensdata eller evaluere modeller ved hjælp af PySpark. Denne mulighed er især relevant, når du skal behandle data i stor skala. Derudover viser vi, hvordan du optimerer dine PySpark-trin ved hjælp af konfigurationer og Spark UI-logfiler.

Rørledninger er en Amazon SageMaker værktøj til at bygge og administrere end-to-end ML pipelines. Det er en fuldt administreret on-demand-tjeneste, integreret med SageMaker og andre AWS-tjenester, og derfor opretter og administrerer ressourcer for dig. Dette sikrer, at forekomster kun klargøres og bruges, når rørledningerne køres. Endvidere er Pipelines støttet af SageMaker Python SDK, så du kan spore din data afstamning , genbrugstrin ved at cache dem for at lette udviklingstiden og omkostningerne. En SageMaker pipeline kan bruge behandlingstrin at behandle data eller udføre modelevaluering.

Ved behandling af data i stor skala bruger datavidenskabsmænd og ML-ingeniører ofte PySpark, en grænseflade til Apache Spark i Python. SageMaker leverer forudbyggede Docker-billeder, der inkluderer PySpark og andre afhængigheder, der er nødvendige for at køre distribuerede databehandlingsjob, inklusive datatransformationer og funktionsteknologi ved hjælp af Spark-rammeværket. Selvom disse billeder giver dig mulighed for hurtigt at begynde at bruge PySpark i behandlingsjob, kræver databehandling i stor skala ofte specifikke Spark-konfigurationer for at optimere den distribuerede databehandling af klyngen skabt af SageMaker.

I vores eksempel opretter vi en SageMaker-pipeline, der kører et enkelt behandlingstrin. For mere information om, hvilke andre trin du kan tilføje til en pipeline, se Rørledningstrin.

SageMaker Processing bibliotek

SageMaker Processing kan køre med specifikke rammer (for eksempel SKlearnProcessor, PySparkProcessor eller Hugging Face). Uafhængigt af den anvendte ramme, hver Bearbejdningstrin kræver følgende:

  • Trinnavn – Navnet, der skal bruges til dit SageMaker-pipelinetrin
  • Trin argumenter – Argumenterne for din ProcessingStep

Derudover kan du give følgende:

  • Konfigurationen af ​​din step cache for at undgå unødvendige kørsler af dit step i en SageMaker pipeline
  • En liste over trinnavne, trinforekomster eller trinindsamlingsforekomster, som ProcessingStep afhænger
  • Visningsnavnet på ProcessingStep
  • En beskrivelse af ProcessingStep
  • Ejendomsfiler
  • Prøv politikker igen

Argumenterne overdrages til ProcessingStep. Du kan bruge sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor klasse for at køre din Spark-applikation inde i et behandlingsjob.

Hver processor kommer med sine egne behov, afhængigt af rammerne. Dette illustreres bedst ved hjælp af PySparkProcessor, hvor du kan videregive yderligere oplysninger for at optimere ProcessingStep yderligere, for eksempel via configuration parameter, når du kører dit job.

Kør SageMaker Processing-job i et sikkert miljø

Det er bedste praksis at oprette en privat Amazon VPC og konfigurere den, så dine jobs ikke er tilgængelige via det offentlige internet. SageMaker Processing-job giver dig mulighed for at specificere de private undernet og sikkerhedsgrupper i din VPC samt aktivere netværksisolering og inter-container trafikkryptering ved hjælp af NetworkConfig.VpcConfig anmodningsparameter for CreateProcessingJob API. Vi giver eksempler på denne konfiguration ved hjælp af SageMaker SDK i næste afsnit.

PySpark ProcessingStep i SageMaker Pipelines

I dette eksempel antager vi, at du har Studio installeret i et sikkert miljø, der allerede er tilgængeligt, inklusive VPC, VPC-slutpunkter, sikkerhedsgrupper, AWS identitets- og adgangsstyring (IAM) roller, og AWS Key Management Service (AWS KMS) taster. Vi antager også, at du har to buckets: en til artefakter som kode og logfiler og en til dine data. Det basic_infra.yaml fil giver et eksempel AWS CloudFormation kode for at tilvejebringe den nødvendige forudsætningsinfrastruktur. Eksempelkoden og implementeringsvejledningen er også tilgængelig på GitHub.

Som et eksempel opretter vi en pipeline indeholdende en enkelt ProcessingStep hvor vi simpelthen læser og skriver abalone datasæt ved hjælp af Spark. Kodeeksemplerne viser dig, hvordan du opsætter og konfigurerer ProcessingStep.

Vi definerer parametre for pipelinen (navn, rolle, buckets og så videre) og trinspecifikke indstillinger (forekomsttype og antal, rammeversion og så videre). I dette eksempel bruger vi en sikker opsætning og definerer også undernet, sikkerhedsgrupper og inter-container trafikkryptering. Til dette eksempel har du brug for en pipeline-udførelsesrolle med SageMaker fuld adgang og en VPC. Se følgende kode:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

For at demonstrere kører følgende kodeeksempel et PySpark-script på SageMaker Processing i en pipeline ved at bruge PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Som vist i den foregående kode, overskriver vi standard Spark-konfigurationerne ved at levere configuration.json som en ProcessingInput. Vi bruger en configuration.json fil, der blev gemt i Amazon Simple Storage Service (Amazon S3) med følgende indstillinger:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Vi kan opdatere standard Spark-konfigurationen enten ved at overføre filen som en ProcessingInput eller ved at bruge konfigurationsargumentet, når du kører run() funktion.

Spark-konfigurationen er afhængig af andre muligheder, såsom den forekomsttype og det antal forekomster, der er valgt for behandlingsjobbet. Den første overvejelse er antallet af forekomster, vCPU-kernerne, som hver af disse forekomster har, og forekomsthukommelsen. Du kan bruge Spark UI'er or CloudWatch-forekomstmålinger og logfiler for at kalibrere disse værdier over flere gentagelser.

Derudover kan executor- og driverindstillingerne optimeres yderligere. For et eksempel på, hvordan man beregner disse, henvises til Bedste praksis for succesfuld håndtering af hukommelse til Apache Spark-applikationer på Amazon EMR.

Dernæst, for driver- og udførerindstillinger, anbefaler vi at undersøge committer-indstillingerne for at forbedre ydeevnen, når du skriver til Amazon S3. I vores tilfælde skriver vi Parket-filer til Amazon S3 og indstiller "spark.sql.parquet.fs.optimized.comitter.optimization-enabled"til sandt.

Hvis det er nødvendigt for en forbindelse til Amazon S3, et regionalt slutpunkt "spark.hadoop.fs.s3a.endpoint” kan angives i konfigurationsfilen.

I dette eksempel pipeline, PySpark-scriptet spark_process.py (som vist i følgende kode) indlæser en CSV-fil fra Amazon S3 i en Spark-dataramme og gemmer dataene som Parket tilbage til Amazon S3.

Bemærk, at vores eksempelkonfiguration ikke er proportional med arbejdsbyrden, fordi læsning og skrivning af abalone-datasættet kunne udføres på standardindstillinger på én instans. De konfigurationer, vi nævnte, bør defineres ud fra dine specifikke behov.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

For at dykke ned i optimering af Spark-behandlingsjob kan du bruge CloudWatch-logfilerne samt Spark-brugergrænsefladen. Du kan oprette Spark UI ved at køre et behandlingsjob på en SageMaker notebook-instans. Du kan se Spark UI til behandlingsjob, der kører inden for en pipeline by kører historieserveren i en SageMaker notebook-forekomst, hvis Spark UI-logfilerne blev gemt på den samme Amazon S3-placering.

Ryd op

Hvis du fulgte selvstudiet, er det god praksis at slette ressourcer, der ikke længere bruges til at stoppe med at påløbe gebyrer. Sørg for at slet CloudFormation-stakken som du brugte til at oprette dine ressourcer. Dette vil slette den oprettede stak såvel som de ressourcer, den oprettede.

Konklusion

I dette indlæg viste vi, hvordan man kører et sikkert SageMaker Processing-job ved hjælp af PySpark i SageMaker Pipelines. Vi demonstrerede også, hvordan du optimerer PySpark ved hjælp af Spark-konfigurationer og konfigurerer dit behandlingsjob til at køre i en sikker netværkskonfiguration.

Som et næste trin kan du undersøge, hvordan man automatiserer hele modellens livscyklus og hvordan kunder byggede sikre og skalerbare MLOps-platforme ved at bruge SageMaker-tjenester.


Om forfatterne

Kør sikre behandlingsjob ved hjælp af PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Maren Suilmann er Data Scientist hos AWS Professional Services. Hun arbejder med kunder på tværs af brancher og afslører styrken af ​​AI/ML for at opnå deres forretningsresultater. Maren har været hos AWS siden november 2019. I sin fritid nyder hun kickboksning, vandreture til fantastiske udsigter og brætspilsaftener.


Kør sikre behandlingsjob ved hjælp af PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Maira Ladeira Tanke
er ML Specialist hos AWS. Med en baggrund i data science har hun 9 års erfaring med at arkitekte og bygge ML-applikationer med kunder på tværs af brancher. Som teknisk leder hjælper hun kunder med at accelerere deres opnåelse af forretningsværdi gennem nye teknologier og innovative løsninger. I sin fritid nyder Maira at rejse og tilbringe tid med sin familie et varmt sted.


Kør sikre behandlingsjob ved hjælp af PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Pauline Ting
er Data Scientist i AWS Professional Services hold. Hun støtter kunder i at opnå og accelerere deres forretningsresultat ved at udvikle AI/ML-løsninger. I sin fritid nyder Pauline at rejse, surfe og prøve nye dessertsteder.


Kør sikre behandlingsjob ved hjælp af PySpark i Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Lodret søgning. Ai.Donald Fossouo
er Sr Data Architect i AWS Professional Services team, der for det meste arbejder med Global Finance Service. Han engagerer sig med kunder for at skabe innovative løsninger, der løser kundernes forretningsproblemer og fremskynder adoptionen af ​​AWS-tjenester. I sin fritid nyder Anders at læse, løbe og rejse.

Tidsstempel:

Mere fra AWS maskinindlæring