הפעל עבודות עיבוד מאובטחות באמצעות PySpark ב-Amazon SageMaker Pipelines

הפעל עבודות עיבוד מאובטחות באמצעות PySpark ב-Amazon SageMaker Pipelines

סטודיו SageMaker של אמזון יכול לעזור לך לבנות, לאמן, לנפות באגים, לפרוס ולנטר את המודלים שלך ולנהל את תהליכי העבודה שלך למידת מכונה (ML). צינורות SageMaker של אמזון מאפשר לך לבנות א פלטפורמת MLOps מאובטחת, ניתנת להרחבה וגמישה בתוך הסטודיו.

בפוסט זה, אנו מסבירים כיצד להפעיל עבודות עיבוד PySpark בתוך צינור. זה מאפשר לכל מי שרוצה לאמן מודל באמצעות Pipelines גם לעבד מראש נתוני אימון, נתוני מסקנות לאחר עיבוד, או להעריך מודלים באמצעות PySpark. יכולת זו רלוונטית במיוחד כאשר אתה צריך לעבד נתונים בקנה מידה גדול. בנוסף, אנו מציגים כיצד לבצע אופטימיזציה של שלבי PySpark שלך באמצעות תצורות ויומני ממשק משתמש של Spark.

צינורות הוא אמזון SageMaker כלי לבניית וניהול צינורות ML מקצה לקצה. זהו שירות מנוהל במלואו לפי דרישה, משולב עם SageMaker ושירותי AWS אחרים, ולכן יוצר ומנהל עבורך משאבים. זה מבטיח שמופעים מסופקים ומשמשים רק בעת הפעלת הצינורות. יתר על כן, Pipelines נתמך על ידי SageMaker Python SDK, ומאפשר לך לעקוב אחר שלך שושלת נתונים ו שלבי שימוש חוזר על ידי שמירתם במטמון כדי להקל על זמן ועלות הפיתוח. צינור SageMaker יכול להשתמש שלבי עיבוד לעבד נתונים או לבצע הערכת מודל.

בעת עיבוד נתונים בקנה מידה גדול, מדעני נתונים ומהנדסי ML משתמשים לעתים קרובות PySpark, ממשק עבור אפאצ 'י ספארק בפייתון. SageMaker מספקת תמונות Docker שנבנו מראש הכוללות PySpark ותלות אחרות הדרושים להפעלת עבודות עיבוד נתונים מבוזרות, כולל טרנספורמציות נתונים והנדסת תכונות באמצעות מסגרת Spark. למרות שתמונות אלו מאפשרות לך להתחיל להשתמש ב-PySpark במהירות בעבודות עיבוד, עיבוד נתונים בקנה מידה גדול דורש לרוב תצורות Spark ספציפיות על מנת לייעל את המחשוב המבוזר של האשכול שנוצר על ידי SageMaker.

בדוגמה שלנו, אנו יוצרים צינור של SageMaker המריץ שלב עיבוד בודד. למידע נוסף על אילו שלבים נוספים אתה יכול להוסיף לצינור, עיין שלבי צינור.

ספריית עיבוד SageMaker

SageMaker Processing יכול לפעול עם ספציפי מסגרות (לדוגמה, SKlearnProcessor, PySparkProcessor או Hugging Face). ללא תלות במסגרת המשמשת, כל אחד עיבוד שלב דורש את הדברים הבאים:

  • שם שלב – השם שישמש עבור שלב הצינור של SageMaker שלך
  • טיעוני צעד – הטיעונים שלך ProcessingStep

בנוסף, אתה יכול לספק את הדברים הבאים:

  • התצורה של מטמון הצעד שלך כדי למנוע ריצות מיותרות של הצעד שלך בצינור של SageMaker
  • רשימה של שמות שלבים, מופעי שלבים או מופעי איסוף שלבים שה- ProcessingStep תלוי
  • שם התצוגה של ProcessingStep
  • תיאור של ProcessingStep
  • קבצי נכסים
  • נסה שוב מדיניות

הטיעונים נמסרים לידי ProcessingStep. אתה יכול להשתמש ב- sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor מחלקה כדי להפעיל את אפליקציית Spark שלך בתוך עבודת עיבוד.

כל מעבד מגיע עם הצרכים שלו, בהתאם למסגרת. זה מומחש בצורה הטובה ביותר באמצעות ה PySparkProcessor, שבו תוכל להעביר מידע נוסף כדי לייעל את ProcessingStep בהמשך, למשל באמצעות ה configuration פרמטר בעת הפעלת העבודה שלך.

הפעל עבודות SageMaker Processing בסביבה מאובטחת

זה התרגול הטוב ביותר כדי ליצור VPC פרטי של Amazon ולהגדיר אותו כך שהעבודות שלך לא יהיו נגישות דרך האינטרנט הציבורי. משרות SageMaker Processing מאפשרות לך לציין את רשתות המשנה וקבוצות האבטחה הפרטיות ב-VPC שלך וכן לאפשר בידוד רשת והצפנת תעבורה בין-מכולות באמצעות NetworkConfig.VpcConfig פרמטר בקשה של CreateProcessingJob ממשק API. אנו מספקים דוגמאות לתצורה זו באמצעות ה SageMaker SDK בחלק הבא.

PySpark ProcessingStep בתוך SageMaker Pipelines

עבור דוגמה זו, אנו מניחים ש-Studio פרוס בסביבה מאובטחת שכבר זמינה, כולל VPC, נקודות קצה VPC, קבוצות אבטחה, AWS זהות וניהול גישה (IAM) תפקידים, ו שירות ניהול מפתח AWS (AWS KMS) מקשי. אנו גם מניחים שיש לך שני דליים: אחד עבור חפצים כמו קוד ויומנים, ואחד עבור הנתונים שלך. ה basic_infra.yaml הקובץ מספק דוגמה AWS CloudFormation קוד כדי לספק את התשתית הדרושה הדרוש. הקוד לדוגמה ומדריך הפריסה זמין גם ב- GitHub.

כדוגמה, הקמנו צינור המכיל סינגל ProcessingStep שבו אנחנו פשוט קוראים וכותבים את מערך נתונים בטחונות באמצעות Spark. דוגמאות הקוד מראות לך כיצד להגדיר ולהגדיר את ProcessingStep.

אנו מגדירים פרמטרים עבור הצינור (שם, תפקיד, דליים וכן הלאה) והגדרות ספציפיות לשלב (סוג וספירת מופעים, גרסת מסגרת וכדומה). בדוגמה זו, אנו משתמשים בהגדרה מאובטחת ומגדירים גם רשתות משנה, קבוצות אבטחה והצפנת תעבורה בין-מכולות. עבור דוגמה זו, אתה צריך תפקיד ביצוע צינור עם גישה מלאה של SageMaker ו-VPC. ראה את הקוד הבא:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

כדי להדגים, דוגמת הקוד הבאה מריצה סקריפט PySpark על SageMaker Processing בתוך צינור באמצעות PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

כפי שמוצג בקוד הקודם, אנו מחליפים את תצורות ברירת המחדל של Spark על ידי מתן configuration.json בתור ProcessingInput. אנו משתמשים ב- a configuration.json קובץ שנשמר בו שירות אחסון פשוט של אמזון (Amazon S3) עם ההגדרות הבאות:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

נוכל לעדכן את תצורת ברירת המחדל של Spark על ידי העברת הקובץ בתור ProcessingInput או באמצעות ארגומנט התצורה בעת הפעלת ה- run() פונקציה.

תצורת Spark תלויה באפשרויות אחרות, כמו סוג המופע וספירת המופעים שנבחרו עבור עבודת העיבוד. השיקול הראשון הוא מספר המופעים, ליבות ה-vCPU שיש לכל אחד מאותם מופעים וזיכרון המופעים. אתה יכול להשתמש ממשק משתמשי Spark or מדדי מופע של CloudWatch ויומנים לכייל ערכים אלה על פני מספר איטרציות של ריצה.

בנוסף, ניתן לבצע אופטימיזציה נוספת של הגדרות הביצוע והנהג. לדוגמא כיצד לחשב אותם, עיין ב שיטות עבודה מומלצות לניהול מוצלח של זיכרון עבור יישומי Apache Spark באמזון EMR.

לאחר מכן, עבור הגדרות מנהל ההתקן והביצוע, אנו ממליצים לחקור את הגדרות ה-committer כדי לשפר את הביצועים בעת כתיבה לאמזון S3. במקרה שלנו, אנחנו כותבים קבצי פרקט לאמזון S3 ומגדירים "spark.sql.parquet.fs.optimized.comitter.optimization-enabled”לאמיתי.

במידת הצורך לחיבור לאמזון S3, נקודת קצה אזורית "spark.hadoop.fs.s3a.endpointניתן לציין בתוך קובץ ההגדרות.

בצינור לדוגמה זה, סקריפט PySpark spark_process.py (כפי שמוצג בקוד הבא) טוען קובץ CSV מאמזון S3 לתוך מסגרת נתונים של Spark, ושומר את הנתונים כ-Parquet בחזרה לאמזון S3.

שים לב שהתצורה לדוגמה שלנו אינה פרופורציונלית לעומס העבודה, מכיוון שקריאת וכתיבה של מערך הנתונים האבלוני יכולים להתבצע בהגדרות ברירת המחדל במופע אחד. יש להגדיר את התצורות שהזכרנו בהתאם לצרכים הספציפיים שלך.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

כדי לצלול לתוך אופטימיזציה של עבודות עיבוד Spark, אתה יכול להשתמש ביומני CloudWatch כמו גם בממשק המשתמש של Spark. אתה יכול ליצור את ממשק המשתמש של Spark על ידי הפעלת עבודת עיבוד במופע מחברת SageMaker. אתה יכול לצפות ב ממשק המשתמש של Spark עבור משימות העיבוד הפועלות בתוך צינור by הפעלת שרת ההיסטוריה בתוך מופע מחברת SageMaker אם יומני ממשק המשתמש של Spark נשמרו באותו מיקום של Amazon S3.

לנקות את

אם עקבת אחר המדריך, מומלץ למחוק משאבים שאינם משמשים עוד כדי להפסיק להיגרר חיובים. הקפד לעשות זאת למחוק את ערימת CloudFormation שהשתמשת בהם כדי ליצור את המשאבים שלך. פעולה זו תמחק את המחסנית שנוצרה כמו גם את המשאבים שהיא יצרה.

סיכום

בפוסט זה, הראינו כיצד להפעיל עבודת SageMaker Processing מאובטחת באמצעות PySpark בתוך SageMaker Pipelines. הדגמנו גם כיצד לייעל את PySpark באמצעות תצורות Spark ולהגדיר את עבודת העיבוד שלך כך שתפעל בתצורת רשת מאובטחת.

כשלב הבא, חקור כיצד להפוך את כל מחזור החיים של המודל לאוטומטי וכיצד לקוחות בנו פלטפורמות MLOps מאובטחות וניתנות להרחבה באמצעות שירותי SageMaker.


על הכותבים

הפעל עבודות עיבוד מאובטחות באמצעות PySpark ב-Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.מרן סוילמן הוא מדען נתונים ב שירותים מקצועיים של AWS. היא עובדת עם לקוחות בכל תעשיות וחושפת את הכוח של AI/ML כדי להשיג את התוצאות העסקיות שלהם. Maren עובדת עם AWS מאז נובמבר 2019. בזמנה הפנוי, היא נהנית מקיקבוקסינג, טיולים אל נופים נהדרים ומערבי משחקי לוח.


הפעל עבודות עיבוד מאובטחות באמצעות PySpark ב-Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.מאירה לדירה טנקה
הוא מומחה ML ב-AWS. עם רקע במדעי הנתונים, יש לה 9 שנות ניסיון באדריכלות ובניית יישומי ML עם לקוחות בכל תעשיות. כמובילה טכנית, היא עוזרת ללקוחות להאיץ את השגת הערך העסקי שלהם באמצעות טכנולוגיות מתפתחות ופתרונות חדשניים. בזמנה הפנוי, מאירה נהנית לטייל ולבלות עם משפחתה במקום חמים.


הפעל עבודות עיבוד מאובטחות באמצעות PySpark ב-Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.פאולין טינג
הוא Data Scientist ב- שירותים מקצועיים של AWS קְבוּצָה. היא תומכת בלקוחות בהשגת והאצת התוצאות העסקיות שלהם על ידי פיתוח פתרונות AI/ML. בזמנה הפנוי, פאולין נהנית לטייל, לגלוש ולנסות מקומות קינוחים חדשים.


הפעל עבודות עיבוד מאובטחות באמצעות PySpark ב-Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. חיפוש אנכי. איי.דונלד פוסאו
הוא Sr Data Architect ב- שירותים מקצועיים של AWS צוות, בעיקר עובד עם Global Finance Service. הוא מתקשר עם לקוחות כדי ליצור פתרונות חדשניים הנותנים מענה לבעיות עסקיות של לקוחות ומאיצים את האימוץ של שירותי AWS. בזמנו הפנוי דונלד נהנה לקרוא, לרוץ ולטייל.

בול זמן:

עוד מ למידת מכונות AWS