Käivitage turvalisi töötlemistöid PySparki abil rakenduses Amazon SageMaker Pipelines

Käivitage turvalisi töötlemistöid PySparki abil rakenduses Amazon SageMaker Pipelines

Amazon SageMaker Studio aitab teil mudeleid luua, koolitada, siluda, juurutada ja jälgida ning hallata masinõppe (ML) töövooge. Amazon SageMakeri torujuhtmed võimaldab teil ehitada a turvaline, skaleeritav ja paindlik MLOps-platvorm Stuudio sees.

Selles postituses selgitame, kuidas PySparki töötlemistöid torustikus käitada. See võimaldab kõigil, kes soovivad Pipelinesi abil mudelit koolitada, ka eeltöötleda koolitusandmeid, järeltöötlusandmeid või hinnata PySparki abil mudeleid. See võimalus on eriti oluline, kui peate töötlema suuremahulisi andmeid. Lisaks tutvustame, kuidas optimeerida PySparki samme konfiguratsioonide ja Spark UI logide abil.

Torujuhtmed on an Amazon SageMaker tööriist otsast lõpuni ML-konveierite ehitamiseks ja haldamiseks. See on täielikult hallatav tellitav teenus, mis on integreeritud SageMakeri ja teiste AWS-teenustega ning loob ja haldab seetõttu teie eest ressursse. See tagab, et eksemplare varustatakse ja kasutatakse ainult torujuhtmete käitamisel. Lisaks toetab torujuhtmeid SageMaker Python SDK, mis võimaldab teil oma andmete põlvnemine ja samme uuesti kasutada salvestades need vahemällu, et hõlbustada arendusaega ja -kulusid. Kasutada saab SageMakeri torujuhet töötlemise etapid andmete töötlemiseks või mudeli hindamiseks.

Suuremahuliste andmete töötlemisel kasutavad andmeteadlased ja ML-insenerid sageli PySpark, liides Apache Spark Pythonis. SageMaker pakub eelehitatud Dockeri kujutisi, mis sisaldavad PySparki ja muid sõltuvusi, mis on vajalikud hajutatud andmetöötlustööde käitamiseks, sealhulgas andmete teisendused ja funktsioonide projekteerimine Sparki raamistiku abil. Kuigi need pildid võimaldavad teil PySparki töötlemistöödel kiiresti kasutama hakata, nõuab suuremahuline andmetöötlus sageli spetsiifilisi Sparki konfiguratsioone, et optimeerida SageMakeri loodud klastri hajutatud andmetöötlust.

Meie näites loome SageMakeri konveieri, mis töötab ühte töötlemisetappi. Lisateavet selle kohta, milliseid muid toiminguid saate torujuhtmele lisada, vaadake jaotisest Torujuhtme sammud.

SageMakeri töötlemise teek

SageMaker Processing võib töötada konkreetsega raamistikud (näiteks SKlearnProcessor, PySparkProcessor või Hugging Face). Sõltumata kasutatud raamistikust, igaüks Töötlemise samm nõuab järgmist:

  • Sammu nimi – SageMakeri torujuhtme sammu jaoks kasutatav nimi
  • Sammuargumendid – Argumendid teie poolt ProcessingStep

Lisaks saate pakkuda järgmist.

  • Teie sammu vahemälu konfiguratsioon, et vältida teie sammu tarbetut käitamist SageMakeri konveieris
  • Sammunimede, sammude eksemplaride või sammukogumiku eksemplaride loend, mida ProcessingStep sõltub
  • Kuvatav nimi ProcessingStep
  • Kirjeldus ProcessingStep
  • Omandi failid
  • Proovige eeskirju uuesti

Argumendid antakse üle ProcessingStep. Võite kasutada sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor klassis, et käitada oma Sparki rakendust töötlemistöö sees.

Igal protsessoril on sõltuvalt raamistikust oma vajadused. Seda saab kõige paremini illustreerida kasutades PySparkProcessor, kuhu saate edastada lisateavet, et optimeerida ProcessingStep edasi, näiteks kaudu configuration parameetrit oma töö tegemisel.

Käivitage SageMakeri töötlemistöid turvalises keskkonnas

See on parimate tavade privaatse Amazon VPC loomiseks ja selle konfigureerimiseks nii, et teie töökohad poleks avaliku Interneti kaudu juurdepääsetavad. SageMakeri töötlemistööd võimaldavad teil määrata oma VPC privaatsed alamvõrgud ja turberühmad ning lubada võrgu eraldamist ja konteineritevahelise liikluse krüptimist, kasutades NetworkConfig.VpcConfig päringu parameeter CreateProcessingJob API. Pakume selle konfiguratsiooni näiteid kasutades SageMaker SDK järgmises jaotises.

PySpark ProcessingStep SageMakeri torujuhtmetes

Selle näite puhul eeldame, et teil on Studio juurutatud turvalises keskkonnas, mis on juba saadaval, sealhulgas VPC, VPC lõpp-punktid, turberühmad, AWS-i identiteedi- ja juurdepääsuhaldus (IAM) rollid ja AWS-i võtmehaldusteenus (AWS KMS) võtmed. Samuti eeldame, et teil on kaks ämbrit: üks artefaktide jaoks, nagu kood ja logid, ning teine ​​teie andmete jaoks. The basic_infra.yaml fail pakub näidet AWS CloudFormation kood vajaliku eeltingimuse infrastruktuuri loomiseks. Näidiskood ja juurutusjuhend on saadaval ka aadressil GitHub.

Näitena seadistame konveieri, mis sisaldab singlit ProcessingStep milles me lihtsalt loeme ja kirjutame abalone andmestik kasutades Sparki. Koodinäidised näitavad, kuidas seadistada ja konfigureerida ProcessingStep.

Määratleme konveieri parameetrid (nimi, roll, ämbrid ja nii edasi) ja etapipõhised sätted (eksemplari tüüp ja arv, raamistiku versioon jne). Selles näites kasutame turvalist seadistust ja määratleme ka alamvõrgud, turvarühmad ja konteineritevahelise liikluse krüptimise. Selle näite jaoks vajate SageMakeri täieliku juurdepääsu ja VPC-ga konveieri täitmise rolli. Vaadake järgmist koodi:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Demonstreerimiseks käivitab järgmine koodinäide PySparki skripti SageMaker Processingis torujuhtme sees, kasutades PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Nagu on näidatud eelmises koodis, kirjutame Sparki vaikekonfiguratsioonid üle, pakkudes configuration.json kui ProcessingInput. Kasutame a configuration.json faili, mis salvestati Amazoni lihtne salvestusteenus (Amazon S3) järgmiste seadistustega:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Saame värskendada Sparki vaikekonfiguratsiooni, edastades faili kui a ProcessingInput või kasutades konfiguratsiooniargumenti käivitamisel run() funktsiooni.

Sparki konfiguratsioon sõltub muudest suvanditest, nagu töötlemistöö jaoks valitud eksemplari tüüp ja eksemplaride arv. Esimene kaalutlus on eksemplaride arv, vCPU tuumad, mis neil eksemplaridel on, ja eksemplari mälu. Sa võid kasutada Spark kasutajaliidesed or CloudWatchi eksemplari mõõdikud ja logib nende väärtuste kalibreerimiseks mitme käitamise iteratsiooni jooksul.

Lisaks saab täituri ja draiveri sätteid veelgi optimeerida. Näide selle kohta, kuidas neid arvutada, vt Parimad tavad Apache Sparki rakenduste mälu edukaks haldamiseks Amazon EMR-is.

Järgmisena soovitame draiveri ja täitja seadete puhul uurida sidumisseadeid, et parandada jõudlust Amazon S3-le kirjutamisel. Meie puhul kirjutame Parketi failid Amazon S3-sse ja seadistame "spark.sql.parquet.fs.optimized.comitter.optimization-enabled” tõeseks.

Kui see on vajalik Amazon S3-ga ühenduse loomiseks, on piirkondlik lõpp-punkt "spark.hadoop.fs.s3a.endpoint” saab määrata konfiguratsioonifailis.

Selles näitekonveieris PySparki skript spark_process.py (nagu on näidatud järgmises koodis) laadib Amazon S3-st CSV-faili Sparki andmeraami ja salvestab andmed parketina tagasi Amazon S3-sse.

Pange tähele, et meie näidiskonfiguratsioon ei ole töökoormusega proportsionaalne, kuna abalone andmestiku lugemist ja kirjutamist saab teha ühel eksemplaril vaikeseadetega. Mainitud konfiguratsioonid tuleks määratleda teie konkreetsete vajaduste põhjal.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Sparki töötlemistööde optimeerimisse sukeldumiseks võite kasutada nii CloudWatchi logisid kui ka Sparki kasutajaliidest. Sparki kasutajaliidese saate luua, käivitades SageMakeri sülearvuti eksemplaris töötlemistöö. Saate vaadata Sparki kasutajaliides konveieri raames töötavate töötlemistööde jaoks by ajalooserveri käivitamine SageMakeri sülearvuti eksemplaris, kui Spark UI logid salvestati samasse Amazon S3 asukohta.

Koristage

Kui järgisite õpetust, on hea tava tasude peatamiseks kustutada ressursid, mida enam ei kasutata. Tehke kindlasti kustutage CloudFormationi virn mida kasutasite oma ressursside loomiseks. See kustutab nii loodud virna kui ka selle loodud ressursid.

Järeldus

Selles postituses näitasime, kuidas SageMakeri torujuhtmetes PySparki abil turvalist SageMakeri töötlemistööd käitada. Samuti näitasime, kuidas optimeerida PySparki Sparki konfiguratsioonide abil ja seadistada teie töötlemistöö töötama turvalises võrgukonfiguratsioonis.

Järgmise sammuna uurige, kuidas ja kuidas kogu mudeli elutsüklit automatiseerida kliendid ehitasid turvalised ja skaleeritavad MLOps-platvormid kasutades SageMakeri teenuseid.


Autoritest

Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maren Suilmann on andmeteadlane aadressil AWS-i professionaalsed teenused. Ta teeb koostööd klientidega erinevates tööstusharudes, tutvustades tehisintellekti/ML-i võimsust nende äritulemuste saavutamiseks. Maren on AWS-is töötanud alates 2019. aasta novembrist. Vabal ajal naudib ta kickpoksi, matkamist suurepäraste vaadete juurde ja lauamänguõhtuid.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maira Ladeira Tanke
on AWS-i ML-spetsialist. Andmeteaduse taustaga on tal 9-aastane kogemus ML-rakenduste kujundamisel ja ehitamisel koos klientidega erinevatest tööstusharudest. Tehnilise juhina aitab ta klientidel kiirendada oma äriväärtuse saavutamist uute tehnoloogiate ja uuenduslike lahenduste kaudu. Vabal ajal meeldib Mairale reisida ja perega soojas kohas aega veeta.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Pauline Ting
on andmeteadlane AWS-i professionaalsed teenused meeskond. Ta toetab kliente nende äritulemuste saavutamisel ja kiirendamisel, arendades AI/ML lahendusi. Vabal ajal naudib Pauline reisimist, surfamist ja uute magustoidukohtade proovimist.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Donald Fossouo
on Sr andmearhitekt AWS-i professionaalsed teenused meeskond, kes töötab peamiselt Global Finance Service'iga. Ta teeb koostööd klientidega, et luua uuenduslikke lahendusi, mis lahendavad klientide äriprobleeme ja kiirendavad AWS-teenuste kasutuselevõttu. Vabal ajal naudib Donald lugemist, jooksmist ja reisimist.

Ajatempel:

Veel alates AWS-i masinõpe