Amazon SageMaker 파이프라인에서 PySpark를 사용하여 보안 처리 작업 실행

Amazon SageMaker 파이프라인에서 PySpark를 사용하여 보안 처리 작업 실행

아마존 세이지 메이커 스튜디오 모델을 구축, 교육, 디버그, 배포 및 모니터링하고 기계 학습(ML) 워크플로를 관리하는 데 도움이 될 수 있습니다. Amazon SageMaker 파이프 라인 구축할 수 있습니다. 안전하고 확장 가능하며 유연한 MLOps 플랫폼 스튜디오 내에서.

이 게시물에서는 파이프라인 내에서 PySpark 처리 작업을 실행하는 방법을 설명합니다. 이를 통해 파이프라인을 사용하여 모델을 교육하려는 사람은 누구나 교육 데이터를 사전 처리하거나 추론 데이터를 사후 처리하거나 PySpark를 사용하여 모델을 평가할 수 있습니다. 이 기능은 특히 대규모 데이터를 처리해야 하는 경우에 적합합니다. 또한 구성 및 Spark UI 로그를 사용하여 PySpark 단계를 최적화하는 방법을 보여줍니다.

파이프라인은 아마존 세이지 메이커 종단 간 ML 파이프라인을 구축하고 관리하기 위한 도구입니다. SageMaker 및 기타 AWS 서비스와 통합된 완전 관리형 온디맨드 서비스이므로 리소스를 생성하고 관리합니다. 이렇게 하면 파이프라인을 실행할 때만 인스턴스가 프로비저닝되고 사용됩니다. 또한 파이프라인은 SageMaker Python SDK, 당신이 당신을 추적하게 데이터 계보재사용 단계 개발 시간과 비용을 줄이기 위해 캐싱합니다. SageMaker 파이프라인은 다음을 사용할 수 있습니다. 처리 단계 데이터를 처리하거나 모델 평가를 수행합니다.

대규모 데이터를 처리할 때 데이터 과학자와 ML 엔지니어는 종종 파이 스파크, 인터페이스 아파치 스파크 파이썬에서. SageMaker는 Spark 프레임워크를 사용하여 데이터 변환 및 기능 엔지니어링을 포함하여 분산 데이터 처리 작업을 실행하는 데 필요한 PySpark 및 기타 종속성을 포함하는 사전 빌드된 Docker 이미지를 제공합니다. 이러한 이미지를 사용하면 처리 작업에서 PySpark를 사용하여 빠르게 시작할 수 있지만 대규모 데이터 처리에는 SageMaker에서 생성된 클러스터의 분산 컴퓨팅을 최적화하기 위해 특정 Spark 구성이 필요한 경우가 많습니다.

이 예에서는 단일 처리 단계를 실행하는 SageMaker 파이프라인을 생성합니다. 파이프라인에 추가할 수 있는 다른 단계에 대한 자세한 내용은 다음을 참조하십시오. 파이프라인 단계.

SageMaker 처리 라이브러리

SageMaker Processing은 특정 프레임 워크 (예: SKlearnProcessor, PySparkProcessor 또는 Hugging Face). 사용되는 프레임워크에 관계없이 각각 처리 단계 다음이 필요합니다.

  • 단계 이름 – SageMaker 파이프라인 단계에 사용할 이름
  • 단계 인수 – 귀하의 주장 ProcessingStep

또한 다음을 제공할 수 있습니다.

  • SageMaker 파이프라인에서 불필요한 단계 실행을 방지하기 위한 단계 캐시 구성
  • 단계 이름, 단계 인스턴스 또는 단계 콜렉션 인스턴스 목록 ProcessingStep 에 따라 달라집니다
  • 의 표시 이름 ProcessingStep
  • 에 대한 설명 ProcessingStep
  • 속성 파일
  • 재시도 정책

주장은 에게 넘겨진다. ProcessingStep. 당신은을 사용할 수 있습니다 sagemaker.spark.PySpark프로세서 or sagemaker.spark.SparkJar프로세서 처리 작업 내에서 Spark 애플리케이션을 실행하는 클래스입니다.

각 프로세서에는 프레임워크에 따라 고유한 요구 사항이 있습니다. 이것은 다음을 사용하여 가장 잘 설명됩니다. PySparkProcessor, 여기에서 추가 정보를 전달하여 최적화할 수 있습니다. ProcessingStep 추가로, 예를 들어 configuration 작업을 실행할 때 매개변수.

안전한 환경에서 SageMaker 처리 작업 실행

가장 좋은 방법 프라이빗 Amazon VPC를 생성하고 퍼블릭 인터넷을 통해 작업에 액세스할 수 없도록 구성합니다. SageMaker 처리 작업을 사용하면 VPC에서 프라이빗 서브넷 및 보안 그룹을 지정할 수 있을 뿐만 아니라 다음을 사용하여 네트워크 격리 및 컨테이너 간 트래픽 암호화를 활성화할 수 있습니다. NetworkConfig.VpcConfig 의 요청 매개변수 CreateProcessingJob API. 다음을 사용하여 이 구성의 예를 제공합니다. 세이지메이커 SDK 다음 절에서.

SageMaker 파이프라인 내의 PySpark 처리 단계

이 예에서는 VPC, VPC 엔드포인트, 보안 그룹, AWS 자격 증명 및 액세스 관리 (IAM) 역할 및 AWS 키 관리 서비스 (AWS KMS) 키. 또한 두 개의 버킷이 있다고 가정합니다. 하나는 코드 및 로그와 같은 아티팩트용이고 다른 하나는 데이터용입니다. 그만큼 basic_infra.yaml 파일은 예제를 제공합니다 AWS 클라우드 포메이션 필요한 전제 조건 인프라를 프로비저닝하는 코드. 예제 코드 및 배포 가이드는 GitHub의.

예를 들어 단일 파이프라인을 포함하는 파이프라인을 설정합니다. ProcessingStep 우리는 단순히 읽고 쓰는 것입니다 전복 데이터 셋 스파크를 사용하여. 코드 샘플은 다음을 설정하고 구성하는 방법을 보여줍니다. ProcessingStep.

파이프라인(이름, 역할, 버킷 등) 및 단계별 설정(인스턴스 유형 및 개수, 프레임워크 버전 등)에 대한 매개변수를 정의합니다. 이 예에서는 보안 설정을 사용하고 서브넷, 보안 그룹 및 컨테이너 간 트래픽 암호화도 정의합니다. 이 예에서는 SageMaker 전체 액세스 권한과 VPC가 있는 파이프라인 실행 역할이 필요합니다. 다음 코드를 참조하십시오.

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

시연을 위해 다음 코드 예제는 다음을 사용하여 파이프라인 내의 SageMaker 처리에서 PySpark 스크립트를 실행합니다. PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

앞의 코드에서 볼 수 있듯이 다음을 제공하여 기본 Spark 구성을 덮어씁니다. configuration.jsonProcessingInput. 우리는 configuration.json 에 저장된 파일 아마존 단순 스토리지 서비스 (Amazon S3) 다음 설정:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

파일을 다음으로 전달하여 기본 Spark 구성을 업데이트할 수 있습니다. ProcessingInput 또는 다음을 실행할 때 구성 인수를 사용하여 run() 기능.

Spark 구성은 처리 작업을 위해 선택한 인스턴스 유형 및 인스턴스 수와 같은 다른 옵션에 따라 다릅니다. 첫 번째 고려 사항은 인스턴스 수, 각 인스턴스에 있는 vCPU 코어 및 인스턴스 메모리입니다. 당신이 사용할 수있는 스파크 UI or CloudWatch 인스턴스 지표 여러 번의 실행 반복에 걸쳐 이러한 값을 보정하기 위한 로그입니다.

또한 실행기 및 드라이버 설정을 더욱 최적화할 수 있습니다. 이를 계산하는 방법에 대한 예는 다음을 참조하십시오. Amazon EMR에서 Apache Spark 애플리케이션의 메모리를 성공적으로 관리하기 위한 모범 사례.

다음으로 드라이버 및 실행기 설정의 경우 커미터 설정을 조사하여 Amazon S3에 쓸 때 성능을 개선하는 것이 좋습니다. 우리의 경우 Parquet 파일을 Amazon S3에 쓰고 "spark.sql.parquet.fs.optimized.comitter.optimization-enabled”을 true로 설정합니다.

Amazon S3 연결에 필요한 경우 지역 엔드포인트 "spark.hadoop.fs.s3a.endpoint”는 구성 파일 내에서 지정할 수 있습니다.

이 예제 파이프라인에서 PySpark 스크립트는 spark_process.py (다음 코드에 표시된 대로) Amazon S3에서 CSV 파일을 Spark 데이터 프레임으로 로드하고 데이터를 Parquet로 다시 Amazon S3에 저장합니다.

하나의 인스턴스에서 기본 설정으로 전복 데이터 세트 읽기 및 쓰기를 수행할 수 있기 때문에 예제 구성은 워크로드에 비례하지 않습니다. 언급한 구성은 특정 요구 사항에 따라 정의해야 합니다.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Spark 처리 작업을 최적화하려면 CloudWatch 로그와 Spark UI를 사용할 수 있습니다. SageMaker 노트북 인스턴스에서 처리 작업을 실행하여 Spark UI를 생성할 수 있습니다. 당신은 볼 수 있습니다 파이프라인 내에서 실행되는 처리 작업을 위한 Spark UI by 히스토리 서버 실행 Spark UI 로그가 동일한 Amazon S3 위치에 저장된 경우 SageMaker 노트북 인스턴스 내.

정리

자습서를 따랐다면 더 이상 사용하지 않는 리소스를 삭제하여 요금 청구를 중지하는 것이 좋습니다. 확인 CloudFormation 스택 삭제 리소스를 생성하는 데 사용한 이렇게 하면 생성된 스택과 생성된 리소스가 삭제됩니다.

결론

이 게시물에서는 SageMaker 파이프라인 내에서 PySpark를 사용하여 안전한 SageMaker 처리 작업을 실행하는 방법을 보여주었습니다. 또한 Spark 구성을 사용하여 PySpark를 최적화하고 안전한 네트워킹 구성에서 실행되도록 처리 작업을 설정하는 방법을 시연했습니다.

다음 단계로 전체 모델 수명 주기를 자동화하는 방법과 안전하고 확장 가능한 MLOps 플랫폼을 구축한 고객 SageMaker 서비스 사용.


저자에 관하여

Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence에서 PySpark를 사용하여 보안 처리 작업을 실행하세요. 수직 검색. 일체 포함.마렌 수일만 의 데이터 과학자입니다. AWS 전문 서비스. 그녀는 비즈니스 성과를 달성하기 위해 AI/ML의 힘을 공개하는 산업 전반의 고객과 협력합니다. Maren은 2019년 XNUMX월부터 AWS에서 근무하고 있습니다. 여가 시간에는 킥복싱, 멋진 전망으로의 하이킹, 보드 게임의 밤을 즐깁니다.


Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence에서 PySpark를 사용하여 보안 처리 작업을 실행하세요. 수직 검색. 일체 포함.마이라 라데이라 탱케
AWS의 ML 전문가입니다. 데이터 과학에 대한 배경 지식이 있는 그녀는 업계 전반의 고객과 함께 ML 애플리케이션을 설계하고 구축한 9년의 경험을 가지고 있습니다. 기술 책임자로서 그녀는 고객이 최신 기술과 혁신적인 솔루션을 통해 비즈니스 가치를 빠르게 달성할 수 있도록 지원합니다. 여가 시간에 Maira는 따뜻한 곳에서 가족과 함께 여행하고 시간을 보내는 것을 즐깁니다.


Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence에서 PySpark를 사용하여 보안 처리 작업을 실행하세요. 수직 검색. 일체 포함.폴린 팅
의 데이터 과학자입니다. AWS 전문 서비스 팀. 그녀는 AI/ML 솔루션을 개발하여 고객이 비즈니스 결과를 달성하고 가속화할 수 있도록 지원합니다. 여가 시간에 Pauline은 여행, 서핑, 새로운 디저트 장소 시도를 즐깁니다.


Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence에서 PySpark를 사용하여 보안 처리 작업을 실행하세요. 수직 검색. 일체 포함.도널드 포수오
Sr 데이터 아키텍트입니다. AWS 전문 서비스 주로 Global Finance Service와 함께 일하는 팀입니다. 그는 고객과 협력하여 고객 비즈니스 문제를 해결하고 AWS 서비스 채택을 가속화하는 혁신적인 솔루션을 만듭니다. 여가 시간에 Donald는 독서, 달리기, 여행을 즐깁니다.

타임 스탬프 :

더보기 AWS 기계 학습