Suorita suojattuja käsittelytöitä PySparkilla Amazon SageMaker Pipelinesissä

Suorita suojattuja käsittelytöitä PySparkilla Amazon SageMaker Pipelinesissä

Amazon SageMaker Studio voi auttaa sinua rakentamaan, kouluttamaan, korjaamaan, ottamaan käyttöön ja valvomaan mallejasi sekä hallitsemaan koneoppimisen työnkulkuja. Amazon SageMaker -putkistot avulla voit rakentaa a turvallinen, skaalautuva ja joustava MLOps-alusta Studion sisällä.

Tässä viestissä selitämme, kuinka PySpark-käsittelytöitä suoritetaan liukuhihnassa. Tämä mahdollistaa sen, että jokainen, joka haluaa kouluttaa mallin Pipelinesillä, voi myös esikäsitellä koulutusdataa, jälkikäsittelyä johtopäätösdataa tai arvioida malleja PySparkin avulla. Tämä ominaisuus on erityisen tärkeä silloin, kun sinun on käsiteltävä suuria tietoja. Lisäksi esittelemme, kuinka voit optimoida PySpark-vaiheesi konfiguraatioiden ja Spark UI -lokien avulla.

Putket ovat an Amazon Sage Maker työkalu päästä päähän ML-putkilinjojen rakentamiseen ja hallintaan. Se on täysin hallittu on-demand-palvelu, joka on integroitu SageMakeriin ja muihin AWS-palveluihin ja luo ja hallitsee resursseja puolestasi. Tämä varmistaa, että ilmentymiä käytetään vain putkilinjojen aikana. Lisäksi Pipelines tukee SageMaker Python SDK, jonka avulla voit seurata omaasi tietojen sukulinja ja käytä vaiheita uudelleen tallentamalla ne välimuistiin kehitysajan ja kustannusten helpottamiseksi. SageMaker-putkilinjaa voidaan käyttää käsittelyvaiheet käsitellä tietoja tai suorittaa mallin arviointia.

Suuren mittakaavan datan käsittelyssä datatieteilijät ja ML-insinöörit käyttävät usein PySpark, käyttöliittymä Apache Spark Pythonissa. SageMaker tarjoaa valmiiksi rakennettuja Docker-otoksia, jotka sisältävät PySparkin ja muita riippuvuuksia, joita tarvitaan hajautettujen tietojenkäsittelytöiden suorittamiseen, mukaan lukien datamuunnokset ja ominaisuussuunnittelu Spark-kehyksen avulla. Vaikka näiden kuvien avulla voit nopeasti aloittaa PySparkin käytön prosessointitöissä, laajamittainen tietojenkäsittely vaatii usein erityisiä Spark-kokoonpanoja, jotta SageMakerin luoman klusterin hajautettu laskenta voidaan optimoida.

Esimerkissämme luomme SageMaker-putkilinjan, joka suorittaa yhden käsittelyvaiheen. Lisätietoja siitä, mitä muita vaiheita voit lisätä liukuhihnaan, on kohdassa Putkilinjan vaiheet.

SageMaker Processing -kirjasto

SageMaker Processing voidaan suorittaa tietyillä puitteet (esimerkiksi SKlearnProcessor, PySparkProcessor tai Hugging Face). Riippumatta käytetystä kehyksestä, kukin Käsittelyvaihe vaatii seuraavaa:

  • Vaiheen nimi – SageMaker-liukuhihnavaiheen nimi
  • Vaiheargumentit – Argumentit puolestasi ProcessingStep

Lisäksi voit toimittaa seuraavat tiedot:

  • Askelvälimuistin määritykset, jotta vältytään tarpeettomilta vaiheiltasi SageMaker-liukuhihnassa
  • Luettelo vaiheiden nimistä, vaihe-esiintymistä tai vaihekokoelmaesiintymistä, jotka ProcessingStep riippuu
  • Kohteen näyttönimi ProcessingStep
  • Kuvaus ProcessingStep
  • Kiinteistötiedostot
  • Yritä käytäntöjä uudelleen

Argumentit luovutetaan ProcessingStep. Voit käyttää sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor luokkaa suorittaaksesi Spark-sovelluksesi käsittelytyön sisällä.

Jokaisella prosessorilla on omat tarpeensa kehyksestä riippuen. Tämä havainnollistetaan parhaiten käyttämällä PySparkProcessor, jossa voit välittää lisätietoja optimoidaksesi ProcessingStep edelleen, esimerkiksi kautta configuration parametri, kun suoritat työtäsi.

Suorita SageMaker Processing -työt suojatussa ympäristössä

Se on parhaiden käytäntöjen luodaksesi yksityisen Amazon VPC:n ja määrittääksesi sen niin, että työsi eivät ole käytettävissä julkisen Internetin kautta. SageMaker-käsittelytöiden avulla voit määrittää yksityiset aliverkot ja suojausryhmät VPC:ssäsi sekä ottaa käyttöön verkon eristyksen ja säilöjen välisen liikenteen salauksen käyttämällä NetworkConfig.VpcConfig pyynnön parametri CreateProcessingJob API. Tarjoamme esimerkkejä tästä kokoonpanosta käyttämällä SageMaker SDK seuraavassa osassa.

PySpark ProcessingStep SageMaker Pipelinesissä

Tässä esimerkissä oletamme, että Studio on otettu käyttöön suojatussa ympäristössä, joka on jo saatavilla, mukaan lukien VPC, VPC-päätepisteet, suojausryhmät, AWS-henkilöllisyyden ja käyttöoikeuksien hallinta (IAM) roolit ja AWS-avainhallintapalvelu (AWS KMS) avaimet. Oletamme myös, että sinulla on kaksi ryhmää: toinen artefakteille, kuten koodille ja lokeille, ja toinen tiedoillesi. The basic_infra.yaml tiedosto tarjoaa esimerkin AWS-pilven muodostuminen koodi tarvittavan edellytysinfrastruktuurin tarjoamiseksi. Esimerkkikoodi ja käyttöönottoopas ovat myös saatavilla osoitteessa GitHub.

Esimerkkinä määritimme liukuhihnan, joka sisältää yhden ProcessingStep jossa vain luemme ja kirjoitamme abalone-tietojoukko käyttämällä Sparkia. Koodiesimerkit näyttävät, kuinka voit määrittää ja määrittää ProcessingStep.

Määrittelemme liukuhihnan parametrit (nimi, rooli, ryhmät ja niin edelleen) ja vaihekohtaiset asetukset (instanssityyppi ja määrä, kehysversio ja niin edelleen). Tässä esimerkissä käytämme suojattua asennusta ja määritämme myös aliverkot, suojausryhmät ja säilöjen välisen liikenteen salauksen. Tässä esimerkissä tarvitset liukuhihnan suoritusroolin, jossa on SageMakerin täydet käyttöoikeudet ja VPC. Katso seuraava koodi:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Osoittaakseen seuraava koodiesimerkki ajaa PySpark-komentosarjan SageMaker Processingissa liukuhihnassa käyttämällä PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Kuten edellisessä koodissa näkyy, korvaamme Spark-oletuskokoonpanot tarjoamalla configuration.json kuten ProcessingInput. Käytämme a configuration.json tiedosto, johon on tallennettu Amazonin yksinkertainen tallennuspalvelu (Amazon S3) seuraavilla asetuksilla:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Voimme päivittää Spark-oletuskokoonpanon joko välittämällä tiedoston muodossa a ProcessingInput tai käyttämällä konfigurointiargumenttia suoritettaessa run() toiminto.

Spark-kokoonpano riippuu muista vaihtoehdoista, kuten prosessointityölle valitusta ilmentymän tyypistä ja ilmentymien määrästä. Ensimmäinen huomio on ilmentymien määrä, kunkin esiintymän vCPU-ytimet ja ilmentymän muisti. Voit käyttää Spark-käyttöliittymät or CloudWatch-instanssimittarit ja lokit näiden arvojen kalibroimiseksi useiden ajoiteraatioiden aikana.

Lisäksi suoritin- ja ajuriasetuksia voidaan optimoida entisestään. Katso esimerkki näiden laskemisesta Parhaat käytännöt Apache Spark -sovellusten muistin hallintaan onnistuneesti Amazon EMR:ssä.

Seuraavaksi suosittelemme ohjaimen ja suorittimen asetusten tutkimista committer-asetusten parantamiseksi suorituskyvyn parantamiseksi, kun kirjoitat Amazon S3:een. Meidän tapauksessamme kirjoitamme Parkettitiedostoja Amazon S3:een ja asetamme "spark.sql.parquet.fs.optimized.comitter.optimization-enabled” totta.

Tarvittaessa yhteyttä Amazon S3:een, alueelliseen päätepisteeseen "spark.hadoop.fs.s3a.endpoint” voidaan määrittää asetustiedostossa.

Tässä esimerkkiputkessa PySpark-komentosarja spark_process.py (kuten seuraavassa koodissa näkyy) lataa CSV-tiedoston Amazon S3:sta Spark-tietokehykseen ja tallentaa tiedot Parquet-muodossa takaisin Amazon S3:een.

Huomaa, että esimerkkikokoonpanomme ei ole oikeassa suhteessa työmäärään, koska abalone-tietojoukon lukeminen ja kirjoittaminen voidaan tehdä oletusasetuksissa yhdessä esiintymässä. Mainitsemamme kokoonpanot tulisi määrittää erityistarpeidesi perusteella.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Voit sukeltaa Spark-käsittelytöiden optimointiin käyttämällä CloudWatch-lokeja sekä Spark-käyttöliittymää. Voit luoda Spark-käyttöliittymän suorittamalla käsittelytyön SageMaker-muistikirjan ilmentymässä. Voit tarkastella Spark-käyttöliittymä liukuhihnassa suoritettaville käsittelytöille by käynnissä historiapalvelin SageMaker-muistikirjan esiintymän sisällä, jos Spark UI -lokit on tallennettu samaan Amazon S3 -sijaintiin.

Puhdistaa

Jos noudatit opetusohjelmaa, on hyvä käytäntö poistaa resurssit, joita ei enää käytetä, jotta kulujen kertyminen loppuisi. Varmista poista CloudFormation-pino joita käytit resurssien luomiseen. Tämä poistaa luodun pinon sekä sen luomat resurssit.

Yhteenveto

Tässä viestissä osoitimme kuinka suorittaa suojattu SageMaker Processing -työ PySparkilla SageMaker Pipelinesissä. Osoitimme myös, kuinka PySpark voidaan optimoida Spark-kokoonpanojen avulla ja määrittää käsittelytyösi toimimaan suojatussa verkkokokoonpanossa.

Seuraavana askeleena tutkia, kuinka ja miten voit automatisoida koko mallin elinkaaren asiakkaat rakensivat turvallisia ja skaalautuvia MLOps-alustoja käyttämällä SageMaker-palveluita.


Tietoja Tekijät

Suorita suojattuja käsittelytöitä PySparkilla Amazon SageMaker Pipelines PlatoBlockchain Data Intelligencessä. Pystysuuntainen haku. Ai.Maren Suilmann on datatieteilijä osoitteessa AWS-asiantuntijapalvelut. Hän työskentelee asiakkaiden kanssa eri toimialoilla paljastaen AI/ML:n tehon saavuttaakseen heidän liiketoimintansa. Maren on ollut AWS:ssä marraskuusta 2019 lähtien. Vapaa-ajallaan hän harrastaa potkunyrkkeilyä, vaellusta upeille näkymille ja lautapeliiltoja.


Suorita suojattuja käsittelytöitä PySparkilla Amazon SageMaker Pipelines PlatoBlockchain Data Intelligencessä. Pystysuuntainen haku. Ai.Maira Ladeira Tanke
on AWS:n ML-asiantuntija. Hänellä on datatieteen tausta, ja hänellä on 9 vuoden kokemus ML-sovellusten suunnittelusta ja rakentamisesta asiakkaiden kanssa eri toimialoilla. Teknisenä johtajana hän auttaa asiakkaita nopeuttamaan liiketoiminnan arvon saavuttamista uusien teknologioiden ja innovatiivisten ratkaisujen avulla. Vapaa-ajallaan Maira matkustaa ja viettää aikaa perheen kanssa jossain lämpimässä paikassa.


Suorita suojattuja käsittelytöitä PySparkilla Amazon SageMaker Pipelines PlatoBlockchain Data Intelligencessä. Pystysuuntainen haku. Ai.Pauline Ting
on Data Scientist vuonna AWS-asiantuntijapalvelut tiimi. Hän tukee asiakkaita heidän liiketoimintansa saavuttamisessa ja nopeuttamisessa kehittämällä AI/ML-ratkaisuja. Vapaa-ajallaan Pauline nauttii matkustamisesta, surffaamisesta ja uusien jälkiruokapaikkojen kokeilemisesta.


Suorita suojattuja käsittelytöitä PySparkilla Amazon SageMaker Pipelines PlatoBlockchain Data Intelligencessä. Pystysuuntainen haku. Ai.Donald Fossouo
on Sr Data Arkkitehti AWS-asiantuntijapalvelut tiimi, joka työskentelee enimmäkseen Global Finance Servicen kanssa. Hän tekee yhteistyötä asiakkaiden kanssa luodakseen innovatiivisia ratkaisuja, jotka ratkaisevat asiakkaiden liiketoiminnan ongelmia ja nopeuttavat AWS-palvelujen käyttöönottoa. Vapaa-ajallaan Donald nauttii lukemisesta, juoksemisesta ja matkustamisesta.

Aikaleima:

Lisää aiheesta AWS-koneoppiminen