Uruchamiaj zadania bezpiecznego przetwarzania przy użyciu PySpark w Amazon SageMaker Pipelines

Uruchamiaj zadania bezpiecznego przetwarzania przy użyciu PySpark w Amazon SageMaker Pipelines

Studio Amazon SageMaker może pomóc w tworzeniu, szkoleniu, debugowaniu, wdrażaniu i monitorowaniu modeli oraz zarządzaniu przepływami pracy uczenia maszynowego (ML). Rurociągi Amazon SageMaker pozwala zbudować bezpieczną, skalowalną i elastyczną platformę MLOps w ramach Studia.

W tym poście wyjaśniamy, jak uruchamiać zadania przetwarzania PySpark w ramach potoku. Dzięki temu każdy, kto chce trenować model przy użyciu potoków, może również wstępnie przetwarzać dane szkoleniowe, przetwarzać dane wnioskowania lub oceniać modele przy użyciu PySpark. Ta funkcja jest szczególnie przydatna, gdy trzeba przetwarzać dane na dużą skalę. Ponadto pokazujemy, jak zoptymalizować kroki PySpark za pomocą konfiguracji i dzienników interfejsu użytkownika platformy Spark.

Rurociągi to Amazon Sage Maker narzędzie do budowania i zarządzania kompleksowymi potokami ML. Jest to w pełni zarządzana usługa na żądanie, zintegrowana z SageMaker i innymi usługami AWS, dzięki czemu tworzy i zarządza zasobami za Ciebie. Dzięki temu instancje są udostępniane i używane tylko podczas uruchamiania potoków. Ponadto Pipelines jest obsługiwany przez SageMaker SDK dla Pythona, umożliwiając śledzenie rodowód danych i kroki ponownego użycia poprzez buforowanie ich w celu skrócenia czasu i kosztów programowania. Potok SageMaker może używać etapy przetwarzania w celu przetwarzania danych lub przeprowadzenia oceny modelu.

Podczas przetwarzania danych na dużą skalę często korzystają z nich analitycy danych i inżynierowie ML PySpark, interfejs do Apache Spark w Pythonie. SageMaker zapewnia gotowe obrazy Dockera, które obejmują PySpark i inne zależności potrzebne do uruchamiania rozproszonych zadań przetwarzania danych, w tym transformacji danych i inżynierii funkcji przy użyciu platformy Spark. Chociaż te obrazy pozwalają szybko rozpocząć korzystanie z PySpark w zadaniach przetwarzania, przetwarzanie danych na dużą skalę często wymaga określonych konfiguracji Spark w celu optymalizacji rozproszonego przetwarzania klastra utworzonego przez SageMaker.

W naszym przykładzie tworzymy potok SageMaker wykonujący pojedynczy krok przetwarzania. Aby uzyskać więcej informacji o innych krokach, które można dodać do potoku, zobacz Kroki rurociągu.

Biblioteka przetwarzania SageMaker

Przetwarzanie SageMaker może działać z określonymi Ramy (na przykład SKlearnProcessor, PySparkProcessor lub Hugging Face). Niezależnie od użytego frameworka, każdy Przetwarzanie Krok wymaga następujących elementów:

  • Nazwa kroku – Nazwa, która ma być używana dla kroku potoku SageMaker
  • Argumenty krokowe – Argumenty za ProcessingStep

Dodatkowo możesz podać:

  • Konfiguracja pamięci podręcznej kroków w celu uniknięcia niepotrzebnych uruchomień kroku w potoku SageMaker
  • Lista nazw kroków, instancji kroków lub instancji kolekcji kroków, które ProcessingStep zależy
  • Wyświetlana nazwa ProcessingStep
  • Opis ProcessingStep
  • Pliki własności
  • Ponów zasady

Argumenty są przekazywane do ProcessingStep. Możesz użyć sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor class, aby uruchomić aplikację Spark w zadaniu przetwarzania.

Każdy procesor ma swoje własne potrzeby, w zależności od struktury. Najlepiej ilustruje to użycie tzw PySparkProcessor, gdzie możesz przekazać dodatkowe informacje w celu optymalizacji ProcessingStep dalej, na przykład przez configuration parametr podczas uruchamiania zadania.

Uruchamiaj zadania SageMaker Processing w bezpiecznym środowisku

Jego najlepsze praktyki stworzyć prywatną Amazon VPC i skonfigurować ją tak, aby Twoje zadania nie były dostępne przez publiczny Internet. Zadania przetwarzania SageMaker umożliwiają określenie prywatnych podsieci i grup zabezpieczeń w VPC, a także umożliwiają izolację sieci i szyfrowanie ruchu między kontenerami za pomocą NetworkConfig.VpcConfig zażądać parametru CreateProcessingJob API. Podajemy przykłady takiej konfiguracji przy użyciu pliku SDK SageMakera w następnej sekcji.

PySpark ProcessingStep w SageMaker Pipelines

W tym przykładzie zakładamy, że masz już wdrożone Studio w bezpiecznym środowisku, w tym VPC, punkty końcowe VPC, grupy zabezpieczeń, AWS Zarządzanie tożsamością i dostępem (IAM) role i Usługa zarządzania kluczami AWS (AWS KMS). Zakładamy również, że masz dwa zasobniki: jeden na artefakty, takie jak kod i dzienniki, a drugi na dane. The basic_infra.yaml plik zawiera przykład Tworzenie chmury AWS kod w celu udostępnienia niezbędnej infrastruktury wstępnej. Przykładowy kod i przewodnik wdrażania są również dostępne na GitHub.

Jako przykład stworzyliśmy potok zawierający pojedynczą ProcessingStep w którym po prostu czytamy i piszemy zbiór danych abalone za pomocą Sparka. Przykłady kodu pokazują, jak skonfigurować i skonfigurować ProcessingStep.

Definiujemy parametry potoku (nazwa, rola, zasobniki itd.) oraz ustawienia specyficzne dla kroku (typ i liczba instancji, wersja frameworka itd.). W tym przykładzie używamy bezpiecznej konfiguracji, a także definiujemy podsieci, grupy zabezpieczeń i szyfrowanie ruchu między kontenerami. W tym przykładzie potrzebujesz roli wykonania potoku z pełnym dostępem do SageMaker i VPC. Zobacz następujący kod:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Aby zademonstrować, poniższy przykład kodu uruchamia skrypt PySpark w SageMaker Processing w ramach potoku przy użyciu PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Jak pokazano w powyższym kodzie, zastępujemy domyślne konfiguracje platformy Spark, udostępniając configuration.json jak ProcessingInput. używamy configuration.json plik, w którym został zapisany Usługa Amazon Simple Storage (Amazon S3) z następującymi ustawieniami:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Możemy zaktualizować domyślną konfigurację platformy Spark, przekazując plik jako ProcessingInput lub używając argumentu konfiguracji podczas uruchamiania run() funkcja.

Konfiguracja platformy Spark zależy od innych opcji, takich jak typ wystąpienia i liczba wystąpień wybranych dla zadania przetwarzania. Pierwszym czynnikiem, który należy wziąć pod uwagę, jest liczba instancji, rdzenie vCPU, które mają każda z tych instancji, oraz pamięć instancji. Możesz użyć Interfejsy Spark or Metryki instancji CloudWatch i rejestruje, aby skalibrować te wartości w wielu iteracjach przebiegu.

Ponadto ustawienia executora i sterownika mogą być jeszcze bardziej zoptymalizowane. Aby zapoznać się z przykładem sposobu ich obliczania, patrz Najlepsze praktyki skutecznego zarządzania pamięcią dla aplikacji Apache Spark w Amazon EMR.

Następnie, w przypadku ustawień sterownika i executora, zalecamy zbadanie ustawień zatwierdzającego, aby poprawić wydajność podczas zapisywania do Amazon S3. W naszym przypadku zapisujemy pliki Parquet na Amazon S3 i ustawiamy „spark.sql.parquet.fs.optimized.comitter.optimization-enabled”Do prawdy.

W razie potrzeby do połączenia z Amazon S3, regionalny punkt końcowy „spark.hadoop.fs.s3a.endpoint” można określić w pliku konfiguracyjnym.

W tym przykładowym potoku skrypt PySpark spark_process.py (jak pokazano w poniższym kodzie) ładuje plik CSV z Amazon S3 do ramki danych Spark i zapisuje dane jako Parquet z powrotem do Amazon S3.

Należy zauważyć, że nasza przykładowa konfiguracja nie jest proporcjonalna do obciążenia, ponieważ odczyt i zapis zestawu danych abalone można wykonać na ustawieniach domyślnych w jednej instancji. Konfiguracje, o których wspomnieliśmy, powinny być zdefiniowane na podstawie konkretnych potrzeb.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Aby zagłębić się w optymalizację zadań przetwarzania Spark, możesz użyć dzienników CloudWatch oraz interfejsu użytkownika Spark. Interfejs użytkownika platformy Spark można utworzyć, uruchamiając zadanie przetwarzania w instancji notatnika SageMaker. Możesz obejrzeć Interfejs użytkownika platformy Spark dla zadań przetwarzania działających w ramach potoku by uruchomienie serwera historii w instancji notatnika SageMaker, jeśli dzienniki interfejsu użytkownika Spark zostały zapisane w tej samej lokalizacji Amazon S3.

Sprzątać

Jeśli postępowałeś zgodnie z samouczkiem, dobrą praktyką jest usuwanie zasobów, które nie są już używane, aby zatrzymać naliczanie opłat. Upewnij się usuń stos CloudFormation którego użyłeś do stworzenia swoich zasobów. Spowoduje to usunięcie utworzonego stosu oraz utworzonych przez niego zasobów.

Wnioski

W tym poście pokazaliśmy, jak uruchomić bezpieczne zadanie SageMaker Processing przy użyciu PySpark w SageMaker Pipelines. Pokazaliśmy również, jak zoptymalizować PySpark przy użyciu konfiguracji Spark i skonfigurować zadanie przetwarzania do uruchamiania w bezpiecznej konfiguracji sieciowej.

W następnym kroku zbadaj, jak zautomatyzować cały cykl życia modelu i jak to zrobić klientów stworzyło bezpieczne i skalowalne platformy MLOps korzystanie z usług SageMaker.


O autorach

Uruchamiaj bezpieczne zadania przetwarzania za pomocą PySpark w Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Maren Suilmann jest naukowcem zajmującym się danymi w Usługi profesjonalne AWS. Pracuje z klientami z różnych branż, ujawniając moc sztucznej inteligencji/uczenia maszynowego w osiąganiu wyników biznesowych. Maren jest w AWS od listopada 2019 roku. W wolnym czasie lubi kickboxing, piesze wędrówki z pięknymi widokami i wieczory z grami planszowymi.


Uruchamiaj bezpieczne zadania przetwarzania za pomocą PySpark w Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Maira Ladeira Tanke
jest specjalistą ML w AWS. Z doświadczeniem w nauce o danych ma 9 lat doświadczenia w architekturze i budowaniu aplikacji ML dla klientów z różnych branż. Jako lider techniczny pomaga klientom przyspieszyć osiąganie przez nich wartości biznesowej dzięki pojawiającym się technologiom i innowacyjnym rozwiązaniom. W wolnym czasie Maira lubi podróżować i spędzać czas z rodziną w ciepłych miejscach.


Uruchamiaj bezpieczne zadania przetwarzania za pomocą PySpark w Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Paulina Ting
jest Data Scientist w Usługi profesjonalne AWS zespół. Wspiera klientów w osiąganiu i przyspieszaniu ich wyników biznesowych poprzez tworzenie rozwiązań AI/ML. W wolnym czasie Pauline lubi podróżować, surfować i próbować nowych deserów.


Uruchamiaj bezpieczne zadania przetwarzania za pomocą PySpark w Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Wyszukiwanie pionowe. AI.Donalda Fossouo
jest starszym architektem danych w Usługi profesjonalne AWS zespół, głównie współpracujący z Global Finance Service. Współpracuje z klientami w celu tworzenia innowacyjnych rozwiązań, które rozwiązują problemy biznesowe klientów i przyspieszają wdrażanie usług AWS. W wolnym czasie Donald lubi czytać, biegać i podróżować.

Znak czasu:

Więcej z Uczenie maszynowe AWS