Futtasson biztonságos feldolgozási feladatokat a PySpark használatával az Amazon SageMaker Pipelinesben

Futtasson biztonságos feldolgozási feladatokat a PySpark használatával az Amazon SageMaker Pipelinesben

Amazon SageMaker Studio segíthet a modellek felépítésében, betanításában, hibakeresésében, üzembe helyezésében és figyelésében, valamint a gépi tanulási (ML) munkafolyamatok kezelésében. Amazon SageMaker csővezetékek lehetővé teszi az a biztonságos, méretezhető és rugalmas MLOps platform a Stúdión belül.

Ebben a bejegyzésben elmagyarázzuk, hogyan lehet PySpark feldolgozási feladatokat futtatni egy folyamaton belül. Ez lehetővé teszi, hogy bárki, aki modellt szeretne betanítani a Pipelines használatával, előfeldolgozza a betanítási adatokat, utófeldolgozási következtetési adatokat, vagy kiértékelje a modelleket a PySpark használatával. Ez a képesség különösen fontos, ha nagyméretű adatokat kell feldolgoznia. Ezenkívül bemutatjuk, hogyan optimalizálhatja PySpark lépéseit konfigurációk és Spark UI naplók segítségével.

A csővezetékek egy Amazon SageMaker eszköz végpontok közötti ML-folyamatok felépítéséhez és kezeléséhez. Ez egy teljesen felügyelt igény szerinti szolgáltatás, amely integrálva van a SageMakerrel és más AWS-szolgáltatásokkal, és ezért erőforrásokat hoz létre és kezel az Ön számára. Ez biztosítja, hogy a példányok csak a folyamatok futtatásakor legyenek kiépítve és használatban. Továbbá a Pipelines-t támogatja a SageMaker Python SDK, így nyomon követheti adatsor és a lépések újrafelhasználása gyorsítótárba helyezésével a fejlesztési idő és költség csökkentése érdekében. A SageMaker csővezeték használható feldolgozási lépések adatok feldolgozására vagy modellértékelés elvégzésére.

A nagyméretű adatok feldolgozásakor az adattudósok és az ML mérnökök gyakran használják PySpark, interfész a számára Apache Spark Pythonban. A SageMaker előre elkészített Docker-képfájlokat biztosít, amelyek PySparkot és más függőséget tartalmaznak, amelyek az elosztott adatfeldolgozási feladatok futtatásához szükségesek, beleértve az adatátalakításokat és a Spark keretrendszer használatával történő szolgáltatástervezést. Bár ezek a képek lehetővé teszik a PySpark gyors használatának megkezdését a feldolgozási feladatokban, a nagyméretű adatfeldolgozás gyakran speciális Spark-konfigurációkat igényel a SageMaker által létrehozott fürt elosztott számításának optimalizálása érdekében.

Példánkban egyetlen feldolgozási lépést futtató SageMaker folyamatot hozunk létre. További információért arról, hogy milyen további lépéseket adhat hozzá a folyamathoz, tekintse meg a következőt: A csővezeték lépései.

SageMaker feldolgozó könyvtár

A SageMaker Processing futtatható specifikus keretek (például SKlearnProcessor, PySparkProcessor vagy Hugging Face). A használt keretrendszertől függetlenül mindegyik ProcessingStep a következőket követeli meg:

  • Lépés neve – A SageMaker folyamatlépéshez használandó név
  • Lépésérvek – Az Ön mellett szóló érvek ProcessingStep

Ezenkívül a következőket is megadhatja:

  • A lépés gyorsítótárának konfigurációja a lépések szükségtelen futtatásának elkerülése érdekében a SageMaker folyamatban
  • A lépésnevek, lépéspéldányok vagy lépésgyűjtemény-példányok listája, amelyeket a ProcessingStep függ
  • A megjelenített neve a ProcessingStep
  • A ProcessingStep
  • Tulajdonságfájlok
  • Próbálja újra az irányelveket

Az érveket átadják a ProcessingStep. Használhatja a sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor osztályban a Spark alkalmazás futtatásához egy feldolgozási feladaton belül.

Minden processzornak megvannak a saját igényei, a keretrendszertől függően. Ezt a legjobban a PySparkProcessor, ahol további információkat adhat át a ProcessingStep tovább, például a configuration paramétert a munka futtatásakor.

Futtassa a SageMaker Processing feladatokat biztonságos környezetben

Ez legjobb gyakorlat létrehozhat egy privát Amazon VPC-t, és konfigurálhatja úgy, hogy munkái ne legyenek elérhetők a nyilvános interneten keresztül. A SageMaker feldolgozási feladatok lehetővé teszik a magán alhálózatok és biztonsági csoportok meghatározását a VPC-ben, valamint a hálózati elkülönítés és a konténerek közötti forgalom titkosításának engedélyezését a NetworkConfig.VpcConfig kérési paramétere CreateProcessingJob API. Példákat mutatunk be erre a konfigurációra a SageMaker SDK a következő részben.

PySpark ProcessingStep a SageMaker Pipelines-en belül

Ebben a példában feltételezzük, hogy a Studio már elérhető biztonságos környezetben van telepítve, beleértve a VPC-t, a VPC-végpontokat, a biztonsági csoportokat, AWS Identity and Access Management (IAM) szerepek, ill AWS kulcskezelési szolgáltatás (AWS KMS) billentyűket. Azt is feltételezzük, hogy két csoportja van: az egyik a műtermékeknek, például a kódnak és a naplóknak, a másik pedig az adatoknak. A basic_infra.yaml fájl példát mutat AWS felhőképződés kódot a szükséges előfeltétel infrastruktúra biztosításához. A példakód és az üzembe helyezési útmutató a webhelyen is elérhető GitHub.

Példaként felállítunk egy csővezetéket, amely egy singlet tartalmaz ProcessingStep amelyben egyszerűen csak olvassuk és írjuk a abalone adatkészlet Spark segítségével. A kódminták bemutatják, hogyan kell beállítani és konfigurálni a ProcessingStep.

Meghatározzuk a folyamat paramétereit (név, szerepkör, gyűjtőcsoportok és így tovább) és lépésspecifikus beállításokat (példánytípus és szám, keretverzió stb.). Ebben a példában biztonságos beállítást használunk, és alhálózatokat, biztonsági csoportokat és a tárolóközi forgalom titkosítását is meghatározzuk. Ebben a példában szükség van egy folyamatvégrehajtási szerepkörre SageMaker teljes hozzáféréssel és VPC-vel. Lásd a következő kódot:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Ennek bemutatása érdekében a következő kódpélda egy PySpark-szkriptet futtat a SageMaker Processing-on egy folyamaton belül a PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Amint az az előző kódban látható, felülírjuk az alapértelmezett Spark-konfigurációkat a biztosítással configuration.json mint ProcessingInput. Használjuk a configuration.json fájlba mentve Amazon egyszerű tárolási szolgáltatás (Amazon S3) a következő beállításokkal:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Frissíthetjük az alapértelmezett Spark konfigurációt úgy, hogy a fájlt a ProcessingInput vagy a konfigurációs argumentum használatával a futtatásakor run() funkciót.

A Spark konfigurációja más beállításoktól függ, például a feldolgozási feladathoz kiválasztott példánytípustól és példányszámtól. Az első szempont a példányok száma, az egyes példányok vCPU-magjai és a példánymemória. Te tudod használni Spark felhasználói felületek or CloudWatch-példánymutatók és naplózza ezeket az értékeket több futási iteráció során.

Ezen túlmenően a végrehajtó és az illesztőprogram beállításai még tovább optimalizálhatók. Ezek kiszámítására vonatkozó példát lásd: Bevált módszerek az Apache Spark-alkalmazások memóriájának sikeres kezeléséhez az Amazon EMR-en.

Ezután az illesztőprogram és a végrehajtó beállításainál javasoljuk, hogy vizsgálja meg a committer beállításokat, hogy javítsa a teljesítményt az Amazon S3-ra írva. Esetünkben a Parquet fájlokat írjuk az Amazon S3-ba, és beállítjuk a „spark.sql.parquet.fs.optimized.comitter.optimization-enabled” igazra.

Ha szükséges az Amazon S3-hoz, egy regionális végponthoz való csatlakozáshozspark.hadoop.fs.s3a.endpoint” a konfigurációs fájlban adható meg.

Ebben a példafolyamatban a PySpark szkript spark_process.py (ahogy a következő kódban látható) betölt egy CSV-fájlt az Amazon S3-ból egy Spark-adatkeretbe, és az adatokat Parquet-ként visszamenti az Amazon S3-ba.

Vegye figyelembe, hogy a példakonfigurációnk nem arányos a munkaterheléssel, mert az abalone adatkészlet olvasása és írása elvégezhető az alapértelmezett beállításokkal egy példányon. Az általunk említett konfigurációkat az Ön egyedi igényei alapján kell meghatározni.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

A Spark-feldolgozási feladatok optimalizálásához használhatja a CloudWatch naplóit, valamint a Spark UI-t. A Spark felhasználói felületet úgy hozhatja létre, hogy egy feldolgozási feladatot futtat egy SageMaker notebook példányon. Megtekintheti a Spark felhasználói felület a folyamatban futó feldolgozási feladatokhoz by fut az előzménykiszolgáló egy SageMaker-jegyzetfüzet-példányon belül, ha a Spark felhasználói felület naplóit ugyanazon az Amazon S3 helyen mentette.

Tisztítsuk meg

Ha követte az oktatóanyagot, célszerű törölni a már fel nem használt erőforrásokat a költségek megállítása érdekében. Győződjön meg róla, hogy törölje a CloudFormation veremet amelyeket az erőforrások létrehozásához használtál. Ezzel törli a létrehozott veremet és az általa létrehozott erőforrásokat.

Következtetés

Ebben a bejegyzésben megmutattuk, hogyan lehet biztonságos SageMaker feldolgozási feladatot futtatni PySpark használatával a SageMaker Pipelines-en belül. Azt is bemutattuk, hogyan optimalizálhatja a PySparkot a Spark-konfigurációk segítségével, és hogyan állíthatja be a feldolgozási feladatot, hogy biztonságos hálózati konfigurációban fusson.

Következő lépésként fedezze fel, hogyan és hogyan automatizálhatja a modell teljes életciklusát az ügyfelek biztonságos és méretezhető MLOps platformokat építettek SageMaker szolgáltatások használatával.


A szerzőkről

Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maren Suilmann adattudós a cégnél AWS professzionális szolgáltatások. A különböző iparágakban működő ügyfelekkel dolgozik, hogy feltárja az AI/ML erejét üzleti eredményeik elérésében. Maren 2019 novembere óta dolgozik az AWS-nél. Szabadidejében szeret kickboxozni, kirándulni a gyönyörű kilátásokhoz, és társasjáték-esteket tölt.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maira Ladeira Tanke
az AWS ML szakértője. Adattudományi háttérrel rendelkezik, és 9 éves tapasztalattal rendelkezik ML alkalmazások tervezésében és építésében különböző iparágakban lévő ügyfelekkel. Műszaki vezetőként segít az ügyfeleknek felgyorsítani az üzleti érték elérését a feltörekvő technológiák és innovatív megoldások révén. Szabadidejében Maira szívesen utazik, és a családjával tölti az idejét valami meleg helyen.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Pauline Ting
a Data Scientist a AWS professzionális szolgáltatások csapat. AI/ML megoldások fejlesztésével támogatja ügyfeleit üzleti eredményük elérésében és felgyorsításában. Szabadidejében Pauline szívesen utazik, szörföz, és új desszert helyeket próbál ki.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Donald Fossouo
Sr Data Architect a AWS professzionális szolgáltatások csapat, többnyire a Global Finance Service munkatársai. Az ügyfelekkel együttműködve olyan innovatív megoldásokat hoz létre, amelyek kezelik az ügyfelek üzleti problémáit, és felgyorsítják az AWS-szolgáltatások alkalmazását. Szabadidejében Donald szeret olvasni, futni és utazni.

Időbélyeg:

Még több AWS gépi tanulás