Führen Sie sichere Verarbeitungsaufträge mit PySpark in Amazon SageMaker-Pipelines aus

Führen Sie sichere Verarbeitungsaufträge mit PySpark in Amazon SageMaker-Pipelines aus

Amazon SageMaker-Studio kann Ihnen beim Erstellen, Trainieren, Debuggen, Bereitstellen und Überwachen Ihrer Modelle sowie beim Verwalten Ihrer Workflows für maschinelles Lernen (ML) helfen. Amazon SageMaker-Pipelines ermöglicht Ihnen den Aufbau einer sichere, skalierbare und flexible MLOps-Plattform im Studio.

In diesem Beitrag erklären wir, wie PySpark-Verarbeitungsjobs innerhalb einer Pipeline ausgeführt werden. Dadurch kann jeder, der ein Modell mit Pipelines trainieren möchte, auch Trainingsdaten vorverarbeiten, Inferenzdaten nachbearbeiten oder Modelle mit PySpark auswerten. Diese Funktion ist besonders relevant, wenn Sie große Datenmengen verarbeiten müssen. Darüber hinaus zeigen wir, wie Sie Ihre PySpark-Schritte mithilfe von Konfigurationen und Spark-UI-Protokollen optimieren können.

Pipelines ist ein Amazon Sage Maker Tool zum Erstellen und Verwalten von End-to-End-ML-Pipelines. Es ist ein vollständig verwalteter On-Demand-Service, der in SageMaker und andere AWS-Services integriert ist und daher Ressourcen für Sie erstellt und verwaltet. Dadurch wird sichergestellt, dass Instanzen nur bereitgestellt und verwendet werden, wenn die Pipelines ausgeführt werden. Darüber hinaus wird Pipelines von der unterstützt SageMaker Python-SDK, damit Sie Ihre verfolgen können Datenherkunft und Schritte wiederverwenden indem Sie sie zwischenspeichern, um Entwicklungszeit und -kosten zu reduzieren. Eine SageMaker-Pipeline kann verwenden Verarbeitungsschritte um Daten zu verarbeiten oder Modellbewertungen durchzuführen.

Bei der Verarbeitung großer Datenmengen verwenden Data Scientists und ML-Ingenieure häufig PySpark, eine Schnittstelle für Apache Funken in Python. SageMaker stellt vorgefertigte Docker-Images bereit, die PySpark und andere Abhängigkeiten enthalten, die zum Ausführen verteilter Datenverarbeitungsaufträge erforderlich sind, einschließlich Datentransformationen und Feature-Engineering mit dem Spark-Framework. Obwohl Sie mit diesen Images schnell mit der Verwendung von PySpark bei der Verarbeitung von Jobs beginnen können, erfordert die groß angelegte Datenverarbeitung häufig spezifische Spark-Konfigurationen, um die verteilte Datenverarbeitung des von SageMaker erstellten Clusters zu optimieren.

In unserem Beispiel erstellen wir eine SageMaker-Pipeline, die einen einzelnen Verarbeitungsschritt ausführt. Weitere Informationen darüber, welche anderen Schritte Sie einer Pipeline hinzufügen können, finden Sie unter Pipeline-Schritte.

SageMaker Processing-Bibliothek

SageMaker Processing kann mit bestimmten ausgeführt werden Gerüste (z. B. SKlearnProcessor, PySparkProcessor oder Hugging Face). Unabhängig vom verwendeten Framework, jedes Verarbeitungsschritt erfordert Folgendes:

  • Schrittname – Der Name, der für Ihren SageMaker-Pipelineschritt verwendet werden soll
  • Schritt-Argumente – Die Argumente für Ihre ProcessingStep

Darüber hinaus können Sie Folgendes angeben:

  • Die Konfiguration für Ihren Step-Cache, um unnötige Ausführungen Ihres Steps in einer SageMaker-Pipeline zu vermeiden
  • Eine Liste mit Schrittnamen, Schrittinstanzen oder Schrittsammlungsinstanzen, die die ProcessingStep hängt
  • Der Anzeigename der ProcessingStep
  • Eine Beschreibung der ProcessingStep
  • Eigenschaftsdateien
  • Wiederholungsrichtlinien

Die Argumente werden an die übergeben ProcessingStep. Sie können die sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor -Klasse, um Ihre Spark-Anwendung innerhalb eines Verarbeitungsauftrags auszuführen.

Jeder Prozessor hat je nach Framework seine eigenen Anforderungen. Dies lässt sich am besten anhand von veranschaulichen PySparkProcessor, wo Sie zusätzliche Informationen zur Optimierung übergeben können ProcessingStep weiter, zum Beispiel über die configuration Parameter, wenn Sie Ihren Job ausführen.

Führen Sie SageMaker-Verarbeitungsjobs in einer sicheren Umgebung aus

Es ist best Practices um eine private Amazon VPC zu erstellen und sie so zu konfigurieren, dass Ihre Jobs nicht über das öffentliche Internet zugänglich sind. Mit SageMaker-Verarbeitungsjobs können Sie die privaten Subnetze und Sicherheitsgruppen in Ihrer VPC angeben sowie die Netzwerkisolierung und die Verschlüsselung des Datenverkehrs zwischen Containern mithilfe von aktivieren NetworkConfig.VpcConfig Anforderungsparameter der CreateProcessingJob API. Wir stellen Beispiele für diese Konfiguration unter Verwendung von bereit SageMaker-SDK im nächsten Abschnitt.

PySpark ProcessingStep innerhalb von SageMaker-Pipelines

Für dieses Beispiel gehen wir davon aus, dass Sie Studio in einer bereits verfügbaren sicheren Umgebung bereitgestellt haben, einschließlich VPC, VPC-Endpunkte, Sicherheitsgruppen, AWS Identity and Access Management and (IAM)-Rollen und AWS-Schlüsselverwaltungsservice (AWS KMS)-Schlüssel. Wir gehen außerdem davon aus, dass Sie zwei Buckets haben: einen für Artefakte wie Code und Protokolle und einen für Ihre Daten. Der basic_infra.yaml Datei enthält ein Beispiel AWS CloudFormation Code, um die erforderliche erforderliche Infrastruktur bereitzustellen. Der Beispielcode und das Bereitstellungshandbuch sind auch unter verfügbar GitHub.

Als Beispiel richten wir eine Pipeline ein, die eine Single enthält ProcessingStep in dem wir einfach lesen und schreiben Abalone-Datensatz mit Spark. Die Codebeispiele zeigen Ihnen, wie Sie die einrichten und konfigurieren ProcessingStep.

Wir definieren Parameter für die Pipeline (Name, Rolle, Buckets usw.) und schrittspezifische Einstellungen (Instanztyp und -anzahl, Framework-Version usw.). In diesem Beispiel verwenden wir ein sicheres Setup und definieren auch Subnetze, Sicherheitsgruppen und die Verschlüsselung des Datenverkehrs zwischen den Containern. Für dieses Beispiel benötigen Sie eine Pipeline-Ausführungsrolle mit SageMaker-Vollzugriff und eine VPC. Siehe folgenden Code:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Zur Veranschaulichung führt das folgende Codebeispiel ein PySpark-Skript auf der SageMaker-Verarbeitung innerhalb einer Pipeline aus, indem es die PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Wie im vorherigen Code gezeigt, überschreiben wir die standardmäßigen Spark-Konfigurationen, indem wir bereitstellen configuration.json als ein ProcessingInput. Wir benutzen ein configuration.json Datei, in der gespeichert wurde Amazon Simple Storage-Service (Amazon S3) mit folgenden Einstellungen:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Wir können die standardmäßige Spark-Konfiguration entweder aktualisieren, indem wir die Datei als ProcessingInput oder indem Sie das Konfigurationsargument beim Ausführen von verwenden run() Funktion.

Die Spark-Konfiguration ist von anderen Optionen abhängig, wie dem Instance-Typ und der Instance-Anzahl, die für den Verarbeitungsauftrag ausgewählt wurden. Die erste Überlegung ist die Anzahl der Instanzen, die vCPU-Kerne, die jede dieser Instanzen hat, und der Instanzspeicher. Sie können verwenden Spark-Benutzeroberflächen or CloudWatch-Instanzmetriken und Protokolle, um diese Werte über mehrere Laufiterationen zu kalibrieren.

Darüber hinaus können die Executor- und Treibereinstellungen noch weiter optimiert werden. Ein Beispiel zur Berechnung dieser finden Sie unter Best Practices für die erfolgreiche Speicherverwaltung für Apache Spark-Anwendungen auf Amazon EMR.

Als Nächstes empfehlen wir für Treiber- und Executor-Einstellungen, die Committer-Einstellungen zu untersuchen, um die Leistung beim Schreiben in Amazon S3 zu verbessern. In unserem Fall schreiben wir Parquet-Dateien in Amazon S3 und setzen „spark.sql.parquet.fs.optimized.comitter.optimization-enabled”Um wahr zu sein.

Falls für eine Verbindung zu Amazon S3 erforderlich, ein regionaler Endpunkt „spark.hadoop.fs.s3a.endpoint“ kann in der Konfigurationsdatei angegeben werden.

In dieser Beispielpipeline das PySpark-Skript spark_process.py (wie im folgenden Code gezeigt) lädt eine CSV-Datei von Amazon S3 in einen Spark-Datenrahmen und speichert die Daten als Parquet zurück in Amazon S3.

Beachten Sie, dass unsere Beispielkonfiguration nicht proportional zur Arbeitslast ist, da das Lesen und Schreiben des Abalone-Datensatzes auf einer Instanz mit den Standardeinstellungen erfolgen könnte. Die von uns erwähnten Konfigurationen sollten basierend auf Ihren spezifischen Anforderungen definiert werden.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Um in die Optimierung von Spark-Verarbeitungsaufträgen einzutauchen, können Sie die CloudWatch-Protokolle sowie die Spark-Benutzeroberfläche verwenden. Sie können die Spark-Benutzeroberfläche erstellen, indem Sie einen Processing-Job auf einer SageMaker-Notebook-Instance ausführen. Sie können die anzeigen Spark-Benutzeroberfläche für die Verarbeitungsaufträge, die in einer Pipeline ausgeführt werden by Ausführen des History-Servers innerhalb einer SageMaker-Notebook-Instance, wenn die Spark-UI-Protokolle am selben Amazon S3-Speicherort gespeichert wurden.

Aufräumen

Wenn Sie das Tutorial befolgt haben, empfiehlt es sich, Ressourcen zu löschen, die nicht mehr verwendet werden, um das Entstehen von Gebühren zu verhindern. Stellen Sie sicher, dass Löschen Sie den CloudFormation-Stack die Sie zum Erstellen Ihrer Ressourcen verwendet haben. Dadurch werden der erstellte Stapel sowie die von ihm erstellten Ressourcen gelöscht.

Zusammenfassung

In diesem Beitrag haben wir gezeigt, wie Sie einen sicheren SageMaker-Verarbeitungsjob mit PySpark in SageMaker-Pipelines ausführen. Wir haben auch gezeigt, wie Sie PySpark mithilfe von Spark-Konfigurationen optimieren und Ihren Verarbeitungsauftrag so einrichten, dass er in einer sicheren Netzwerkkonfiguration ausgeführt wird.

Untersuchen Sie im nächsten Schritt, wie Sie den gesamten Modelllebenszyklus automatisieren können und wie Kunden haben sichere und skalierbare MLOps-Plattformen entwickelt Verwenden von SageMaker-Diensten.


Über die Autoren

Führen Sie sichere Verarbeitungsaufträge mit PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence aus. Vertikale Suche. Ai.Maren Suilmann ist Data Scientist bei Professionelle AWS-Services. Sie arbeitet mit Kunden aus allen Branchen zusammen, um die Leistungsfähigkeit von KI/ML zu enthüllen, um ihre Geschäftsergebnisse zu erzielen. Maren ist seit November 2019 bei AWS. In ihrer Freizeit liebt sie Kickboxen, Wanderungen zu großartigen Aussichten und Brettspielabende.


Führen Sie sichere Verarbeitungsaufträge mit PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence aus. Vertikale Suche. Ai.Maira Ladeira Tanke
ist ML-Spezialist bei AWS. Mit einem Hintergrund in Data Science verfügt sie über 9 Jahre Erfahrung in der Architektur und Erstellung von ML-Anwendungen mit Kunden aus allen Branchen. Als technische Leiterin hilft sie Kunden dabei, ihre geschäftliche Wertschöpfung durch neue Technologien und innovative Lösungen zu beschleunigen. In ihrer Freizeit reist Maira gerne und verbringt Zeit mit ihrer Familie an einem warmen Ort.


Führen Sie sichere Verarbeitungsaufträge mit PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence aus. Vertikale Suche. Ai.Pauline Ting
ist Data Scientist in der Professionelle AWS-Services Team. Sie unterstützt Kunden beim Erreichen und Beschleunigen ihrer Geschäftsergebnisse durch die Entwicklung von KI/ML-Lösungen. In ihrer Freizeit reist Pauline gerne, surft und probiert neue Dessertlokale aus.


Führen Sie sichere Verarbeitungsaufträge mit PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence aus. Vertikale Suche. Ai.Donald Fosso
ist Senior Data Architect in der Professionelle AWS-Services Team, das hauptsächlich mit Global Finance Service zusammenarbeitet. Er arbeitet mit Kunden zusammen, um innovative Lösungen zu entwickeln, die die geschäftlichen Probleme der Kunden angehen und die Einführung von AWS-Services beschleunigen. In seiner Freizeit liest, läuft und reist Donald gerne.

Zeitstempel:

Mehr von AWS Maschinelles Lernen