کارهای پردازش ایمن را با استفاده از PySpark در Amazon SageMaker Pipelines اجرا کنید

کارهای پردازش ایمن را با استفاده از PySpark در Amazon SageMaker Pipelines اجرا کنید

Amazon SageMaker Studio می تواند به شما در ساخت، آموزش، اشکال زدایی، استقرار و نظارت بر مدل های خود و مدیریت گردش کار یادگیری ماشین (ML) کمک کند. خطوط لوله آمازون SageMaker شما را قادر می سازد تا بسازید پلت فرم MLOps امن، مقیاس پذیر و انعطاف پذیر در استودیو

در این پست نحوه اجرای وظایف پردازش PySpark را در یک خط لوله توضیح می دهیم. این به هر کسی که می‌خواهد مدلی را با استفاده از Pipelines آموزش دهد، قادر می‌سازد تا داده‌های آموزشی، داده‌های استنتاج پس از پردازش، یا مدل‌ها را با استفاده از PySpark ارزیابی کند. این قابلیت به ویژه زمانی مرتبط است که شما نیاز به پردازش داده های در مقیاس بزرگ دارید. علاوه بر این، نحوه بهینه‌سازی مراحل PySpark خود را با استفاده از تنظیمات و گزارش‌های Spark UI نشان می‌دهیم.

خطوط لوله است آمازون SageMaker ابزاری برای ساخت و مدیریت خطوط لوله ML سرتاسر. این یک سرویس کاملاً مدیریت شده بر اساس تقاضا است که با SageMaker و سایر خدمات AWS یکپارچه شده است و بنابراین منابع را برای شما ایجاد و مدیریت می کند. این تضمین می کند که نمونه ها فقط در هنگام اجرای خطوط لوله تهیه و مورد استفاده قرار می گیرند. علاوه بر این، Pipelines توسط SageMaker Python SDK، به شما امکان می دهد خود را پیگیری کنید اصل و نسب داده و مراحل استفاده مجدد با ذخیره کردن آنها برای کاهش زمان و هزینه توسعه. یک خط لوله SageMaker می تواند استفاده کند مراحل پردازش برای پردازش داده ها یا انجام ارزیابی مدل.

هنگام پردازش داده های در مقیاس بزرگ، دانشمندان داده و مهندسان ML اغلب از آنها استفاده می کنند PySpark، یک رابط برای جرقه آپاچی در پایتون SageMaker تصاویر Docker از پیش ساخته شده را ارائه می دهد که شامل PySpark و سایر وابستگی های مورد نیاز برای اجرای کارهای پردازش داده های توزیع شده، از جمله تبدیل داده ها و مهندسی ویژگی ها با استفاده از چارچوب Spark است. اگرچه این تصاویر به شما اجازه می‌دهند تا به سرعت استفاده از PySpark را در کارهای پردازشی شروع کنید، پردازش داده‌های در مقیاس بزرگ اغلب به پیکربندی‌های Spark خاصی نیاز دارد تا محاسبات توزیع شده خوشه ایجاد شده توسط SageMaker را بهینه کند.

در مثال خود، ما یک خط لوله SageMaker ایجاد می کنیم که یک مرحله پردازش را اجرا می کند. برای اطلاعات بیشتر در مورد مراحل دیگری که می توانید به خط لوله اضافه کنید، به آن مراجعه کنید مراحل خط لوله.

کتابخانه پردازش SageMaker

SageMaker Processing می تواند با موارد خاص اجرا شود چارچوب (به عنوان مثال، SKlearnProcessor، PySparkProcessor، یا Hugging Face). مستقل از چارچوب مورد استفاده، هر کدام مرحله پردازش به موارد زیر نیاز دارد:

  • نام مرحله - نامی که برای مرحله خط لوله SageMaker خود استفاده می شود
  • آرگومان های مرحله ای - استدلال برای شما ProcessingStep

علاوه بر این، می توانید موارد زیر را ارائه دهید:

  • پیکربندی حافظه پنهان مرحله شما به منظور جلوگیری از اجرای غیر ضروری مرحله شما در خط لوله SageMaker
  • فهرستی از نام‌های گام، نمونه‌های گام، یا نمونه‌های مجموعه گام که ProcessingStep بستگی دارد
  • نام نمایشی ProcessingStep
  • توصیفی از ProcessingStep
  • فایل های اموال
  • سیاست‌ها را دوباره امتحان کنید

استدلال ها به دست می دهند ProcessingStep. می توانید از sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor کلاس برای اجرای برنامه Spark شما در داخل یک کار پردازشی.

هر پردازنده بسته به فریمورک نیازهای خود را دارد. این به بهترین شکل با استفاده از نشان داده شده است PySparkProcessor، که در آن می توانید اطلاعات اضافی را برای بهینه سازی ارسال کنید ProcessingStep بیشتر، به عنوان مثال از طریق configuration پارامتر هنگام اجرای کار شما

کارهای پردازش SageMaker را در یک محیط امن اجرا کنید

این بهترین تمرین برای ایجاد یک Amazon VPC خصوصی و پیکربندی آن به گونه ای که مشاغل شما از طریق اینترنت عمومی قابل دسترسی نباشد. کارهای پردازش SageMaker به شما این امکان را می دهد که زیرشبکه های خصوصی و گروه های امنیتی را در VPC خود مشخص کنید و همچنین جداسازی شبکه و رمزگذاری ترافیک بین کانتینری را با استفاده از NetworkConfig.VpcConfig پارامتر درخواست از CreateProcessingJob API. ما نمونه هایی از این پیکربندی را با استفاده از SageMaker SDK در بخش بعدی

PySpark ProcessingStep در SageMaker Pipelines

برای این مثال، ما فرض می‌کنیم که استودیو را در یک محیط امن از قبل موجود است، از جمله VPC، نقاط پایانی VPC، گروه‌های امنیتی، هویت AWS و مدیریت دسترسی (IAM) نقش ها، و سرویس مدیریت کلید AWS کلیدهای (AWS KMS). ما همچنین فرض می‌کنیم که شما دو سطل دارید: یکی برای مصنوعاتی مانند کد و گزارش‌ها و دیگری برای داده‌هایتان. را basic_infra.yaml فایل مثالی ارائه می دهد AWS CloudFormation کدی برای تهیه زیرساخت های پیش نیاز لازم. کد مثال و راهنمای استقرار نیز در دسترس است GitHub.

به عنوان مثال، ما یک خط لوله حاوی یک تک راه اندازی کردیم ProcessingStep که در آن ما به سادگی آن را می خوانیم و می نویسیم مجموعه داده آبالون با استفاده از اسپارک نمونه کد به شما نشان می دهد که چگونه می توانید آن را تنظیم و پیکربندی کنید ProcessingStep.

ما پارامترهایی را برای خط لوله (نام، نقش، سطل ها و غیره) و تنظیمات خاص مرحله (نوع و تعداد نمونه، نسخه چارچوب و غیره) تعریف می کنیم. در این مثال، ما از یک راه‌اندازی امن استفاده می‌کنیم و همچنین زیرشبکه‌ها، گروه‌های امنیتی و رمزگذاری ترافیک بین کانتینری را تعریف می‌کنیم. برای این مثال، شما به یک نقش اجرای خط لوله با دسترسی کامل SageMaker و یک VPC نیاز دارید. کد زیر را ببینید:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

برای نشان دادن، مثال کد زیر یک اسکریپت PySpark را در SageMaker Processing در یک خط لوله با استفاده از PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

همانطور که در کد قبلی نشان داده شده است، با ارائه تنظیمات پیش‌فرض Spark را بازنویسی می‌کنیم configuration.json به عنوان یک ProcessingInput. ما از a استفاده می کنیم configuration.json فایلی که در آن ذخیره شده است سرویس ذخیره سازی ساده آمازون (Amazon S3) با تنظیمات زیر:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

می‌توانیم پیکربندی پیش‌فرض Spark را با ارسال فایل به‌عنوان a به‌روزرسانی کنیم ProcessingInput یا با استفاده از آرگومان پیکربندی هنگام اجرای run() تابع.

پیکربندی Spark به گزینه های دیگری مانند نوع نمونه و تعداد نمونه انتخاب شده برای کار پردازش بستگی دارد. اولین مورد توجه تعداد نمونه ها، هسته های vCPU که هر یک از آن نمونه ها دارند و حافظه نمونه است. شما می توانید استفاده کنید رابط های کاربری Spark or معیارهای نمونه CloudWatch و لاگ برای کالیبره کردن این مقادیر در چندین تکرار اجرا می شود.

علاوه بر این، تنظیمات اجرایی و درایور را می توان حتی بیشتر بهینه کرد. برای مثالی از نحوه محاسبه اینها به ادامه مطلب مراجعه کنید بهترین روش ها برای مدیریت موفقیت آمیز حافظه برای برنامه های Apache Spark در آمازون EMR.

در مرحله بعد، برای تنظیمات درایور و مجری، توصیه می‌کنیم تنظیمات committer را برای بهبود عملکرد هنگام نوشتن در Amazon S3 بررسی کنید. در مورد ما، ما در حال نوشتن فایل های پارکت در آمازون S3 و تنظیم "spark.sql.parquet.fs.optimized.comitter.optimization-enabled” درست است.

در صورت نیاز برای اتصال به آمازون S3، یک نقطه پایانی منطقه ایspark.hadoop.fs.s3a.endpoint” را می توان در فایل تنظیمات مشخص کرد.

در این خط لوله مثال، اسکریپت PySpark spark_process.py (همانطور که در کد زیر نشان داده شده است) یک فایل CSV از Amazon S3 را در یک قاب داده Spark بارگذاری می کند و داده ها را به عنوان Parquet در Amazon S3 ذخیره می کند.

توجه داشته باشید که پیکربندی مثال ما با حجم کار متناسب نیست زیرا خواندن و نوشتن مجموعه داده abalone را می توان در تنظیمات پیش فرض در یک نمونه انجام داد. تنظیماتی که ذکر کردیم باید بر اساس نیازهای خاص شما تعریف شوند.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

برای بهینه‌سازی کارهای پردازش Spark، می‌توانید از گزارش‌های CloudWatch و همچنین Spark UI استفاده کنید. می‌توانید با اجرای یک کار پردازش در یک نمونه نوت‌بوک SageMaker، Spark UI را ایجاد کنید. می توانید مشاهده کنید Spark UI برای کارهای پردازشی که در یک خط لوله اجرا می شوند by اجرای سرور تاریخچه در یک نمونه نوت بوک SageMaker اگر گزارش‌های Spark UI در همان مکان Amazon S3 ذخیره شده باشند.

پاک کردن

اگر این آموزش را دنبال کردید، تمرین خوبی است که منابعی را که دیگر برای جلوگیری از تحمیل هزینه استفاده نمی شوند حذف کنید. مطمئن شوید که پشته CloudFormation را حذف کنید که برای ایجاد منابع خود استفاده کردید. این کار پشته ایجاد شده و همچنین منابع ایجاد شده را حذف می کند.

نتیجه

در این پست، نحوه اجرای یک کار پردازش امن SageMaker را با استفاده از PySpark در SageMaker Pipelines نشان دادیم. ما همچنین نشان دادیم که چگونه PySpark را با استفاده از تنظیمات Spark بهینه کنیم و کار پردازش خود را برای اجرا در یک پیکربندی شبکه ایمن تنظیم کنیم.

به عنوان گام بعدی، چگونگی خودکارسازی کل چرخه عمر مدل و چگونگی آن را بررسی کنید مشتریان پلتفرم های MLOps ایمن و مقیاس پذیر ایجاد کردند با استفاده از خدمات SageMaker


درباره نویسنده

کارهای پردازش ایمن را با استفاده از PySpark در Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence اجرا کنید. جستجوی عمودی Ai.مارن سویلمن دانشمند داده در خدمات حرفه ای AWS. او با مشتریان در سراسر صنایع کار می کند و از قدرت AI/ML برای دستیابی به نتایج تجاری خود پرده برداری می کند. مارن از نوامبر 2019 در AWS بوده است. او در اوقات فراغت خود از کیک بوکسینگ، پیاده روی به سمت مناظر عالی و شب های بازی روی تخته لذت می برد.


کارهای پردازش ایمن را با استفاده از PySpark در Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence اجرا کنید. جستجوی عمودی Ai.مایرا لادیرا تانکه
متخصص ML در AWS است. او با سابقه ای در علم داده، 9 سال تجربه معماری و ساخت برنامه های کاربردی ML با مشتریان در سراسر صنایع دارد. او به عنوان یک رهبر فنی، به مشتریان کمک می کند تا از طریق فناوری های نوظهور و راه حل های نوآورانه، دستیابی به ارزش تجاری خود را تسریع بخشند. مایرا در اوقات فراغت خود از مسافرت و گذراندن وقت با خانواده در مکانی گرم لذت می برد.


کارهای پردازش ایمن را با استفاده از PySpark در Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence اجرا کنید. جستجوی عمودی Ai.پائولین تینگ
دانشمند داده در خدمات حرفه ای AWS تیم او از مشتریان در دستیابی و سرعت بخشیدن به نتیجه کسب و کارشان با توسعه راه حل های AI/ML پشتیبانی می کند. پائولین در اوقات فراغت خود از سفر، موج سواری و امتحان مکان های دسر جدید لذت می برد.


کارهای پردازش ایمن را با استفاده از PySpark در Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence اجرا کنید. جستجوی عمودی Ai.دونالد فوسو
Sr Data Architect در خدمات حرفه ای AWS تیمی که بیشتر با خدمات مالی جهانی کار می کند. او با مشتریان درگیر می شود تا راه حل های نوآورانه ای ایجاد کند که به مشکلات تجاری مشتری رسیدگی می کند و پذیرش خدمات AWS را تسریع می بخشد. دونالد در اوقات فراغت خود از خواندن، دویدن و سفر لذت می برد.

تمبر زمان:

بیشتر از آموزش ماشین AWS