Chạy các công việc xử lý an toàn bằng PySpark trong Amazon SageMaker Pipelines

Chạy các công việc xử lý an toàn bằng PySpark trong Amazon SageMaker Pipelines

Xưởng sản xuất Amazon SageMaker có thể giúp bạn xây dựng, đào tạo, gỡ lỗi, triển khai và giám sát các mô hình cũng như quản lý quy trình công việc máy học (ML) của bạn. Đường ống Amazon SageMaker cho phép bạn xây dựng một nền tảng MLOps an toàn, có thể mở rộng và linh hoạt trong Studio.

Trong bài đăng này, chúng tôi giải thích cách chạy các công việc xử lý PySpark trong một đường dẫn. Điều này cho phép bất kỳ ai muốn đào tạo một mô hình bằng Đường ống cũng có thể xử lý trước dữ liệu đào tạo, dữ liệu suy luận sau xử lý hoặc đánh giá các mô hình bằng PySpark. Khả năng này đặc biệt phù hợp khi bạn cần xử lý dữ liệu quy mô lớn. Ngoài ra, chúng tôi giới thiệu cách tối ưu hóa các bước PySpark của bạn bằng cách sử dụng cấu hình và nhật ký giao diện người dùng Spark.

Đường ống là một Amazon SageMaker công cụ để xây dựng và quản lý các quy trình ML từ đầu đến cuối. Đó là dịch vụ theo yêu cầu được quản lý hoàn toàn, được tích hợp với SageMaker và các dịch vụ AWS khác, do đó tạo và quản lý tài nguyên cho bạn. Điều này đảm bảo rằng các phiên bản chỉ được cung cấp và sử dụng khi chạy quy trình. Hơn nữa, Pipelines được hỗ trợ bởi SDK Python của SageMaker, cho phép bạn theo dõi dòng dữ liệutái sử dụng các bước bằng cách lưu trữ chúng để giảm bớt thời gian và chi phí phát triển. Một đường dẫn SageMaker có thể sử dụng các bước xử lý để xử lý dữ liệu hoặc thực hiện đánh giá mô hình.

Khi xử lý dữ liệu quy mô lớn, các nhà khoa học dữ liệu và kỹ sư ML thường sử dụng PySpark, một giao diện cho Apache Spark trong Python. SageMaker cung cấp hình ảnh Docker dựng sẵn bao gồm PySpark và các phụ thuộc khác cần thiết để chạy các công việc xử lý dữ liệu phân tán, bao gồm chuyển đổi dữ liệu và kỹ thuật tính năng bằng cách sử dụng khung Spark. Mặc dù những hình ảnh đó cho phép bạn nhanh chóng bắt đầu sử dụng PySpark trong các công việc xử lý, nhưng việc xử lý dữ liệu quy mô lớn thường yêu cầu các cấu hình Spark cụ thể để tối ưu hóa điện toán phân tán của cụm do SageMaker tạo.

Trong ví dụ của chúng tôi, chúng tôi tạo một quy trình SageMaker chạy một bước xử lý. Để biết thêm thông tin về những bước khác mà bạn có thể thêm vào quy trình bán hàng, hãy tham khảo Các bước đường ống.

Thư viện xử lý SageMaker

SageMaker Xử lý có thể chạy với cụ thể khung (ví dụ: SKlearnProcessor, PySparkProcessor hoặc Ôm mặt). Độc lập với khuôn khổ được sử dụng, mỗi Xử lýBước yêu cầu những điều sau:

  • tên bước – Tên sẽ được sử dụng cho bước quy trình SageMaker của bạn
  • đối số bước – Các lập luận cho bạn ProcessingStep

Ngoài ra, bạn có thể cung cấp như sau:

  • Cấu hình cho bộ đệm ẩn bước của bạn để tránh các lần chạy bước không cần thiết trong quy trình SageMaker
  • Danh sách tên bước, thể hiện bước hoặc thể hiện bộ sưu tập bước mà ProcessingStep phụ thuộc vào
  • Tên hiển thị của ProcessingStep
  • Mô tả về ProcessingStep
  • tập tin tài sản
  • Thử lại chính sách

Các đối số được bàn giao cho ProcessingStep. Bạn có thể dùng sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor class để chạy ứng dụng Spark của bạn bên trong một công việc đang xử lý.

Mỗi bộ xử lý đi kèm với nhu cầu riêng của nó, tùy thuộc vào khung. Điều này được minh họa tốt nhất bằng cách sử dụng PySparkProcessor, nơi bạn có thể chuyển thông tin bổ sung để tối ưu hóa ProcessingStep hơn nữa, ví dụ thông qua configuration tham số khi chạy công việc của bạn.

Chạy các công việc Xử lý SageMaker trong một môi trường an toàn

thực hành tốt nhất để tạo một Amazon VPC riêng và định cấu hình nó để công việc của bạn không thể truy cập được qua internet công cộng. Các công việc Xử lý SageMaker cho phép bạn chỉ định các mạng con riêng tư và nhóm bảo mật trong VPC của mình cũng như kích hoạt cách ly mạng và mã hóa lưu lượng giữa các vùng chứa bằng cách sử dụng NetworkConfig.VpcConfig tham số yêu cầu của CreateProcessingJob API. Chúng tôi cung cấp các ví dụ về cấu hình này bằng cách sử dụng SDK SageMaker trong phần tiếp theo.

Xử lý PySparkBước trong Đường ống SageMaker

Đối với ví dụ này, chúng tôi giả định rằng bạn đã triển khai Studio trong một môi trường an toàn, bao gồm VPC, điểm cuối VPC, nhóm bảo mật, Quản lý truy cập và nhận dạng AWS (IAM) vai trò, và Dịch vụ quản lý khóa AWS (AWS KMS). Chúng tôi cũng giả định rằng bạn có hai nhóm: một nhóm dành cho các thành phần lạ như mã và nhật ký và một nhóm dành cho dữ liệu của bạn. Các basic_infra.yaml tập tin cung cấp ví dụ Hình thành đám mây AWS mã để cung cấp cơ sở hạ tầng tiên quyết cần thiết. Mã ví dụ và hướng dẫn triển khai cũng có sẵn trên GitHub.

Ví dụ, chúng tôi thiết lập một đường ống chứa một ProcessingStep trong đó chúng tôi chỉ đơn giản là đọc và viết tập dữ liệu bào ngư sử dụng Spark. Các mẫu mã chỉ cho bạn cách thiết lập và định cấu hình ProcessingStep.

Chúng tôi xác định các tham số cho quy trình (tên, vai trò, bộ chứa, v.v.) và cài đặt theo từng bước cụ thể (loại và số lượng phiên bản, phiên bản khung, v.v.). Trong ví dụ này, chúng tôi sử dụng một thiết lập an toàn, đồng thời xác định các mạng con, nhóm bảo mật và mã hóa lưu lượng liên vùng chứa. Đối với ví dụ này, bạn cần có vai trò thực thi quy trình với toàn quyền truy cập SageMaker và một VPC. Xem đoạn mã sau:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Để chứng minh, ví dụ mã sau chạy tập lệnh PySpark trên SageMaker Xử lý trong một đường dẫn bằng cách sử dụng PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Như được hiển thị trong đoạn mã trước, chúng tôi đang ghi đè cấu hình Spark mặc định bằng cách cung cấp configuration.json như là một ProcessingInput. chúng tôi sử dụng một configuration.json tập tin đã được lưu trong Dịch vụ lưu trữ đơn giản của Amazon (Amazon S3) với các cài đặt sau:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Chúng tôi có thể cập nhật cấu hình Spark mặc định bằng cách chuyển tệp dưới dạng ProcessingInput hoặc bằng cách sử dụng đối số cấu hình khi chạy run() chức năng.

Cấu hình Spark phụ thuộc vào các tùy chọn khác, chẳng hạn như loại phiên bản và số lượng phiên bản được chọn cho tác vụ xử lý. Việc xem xét đầu tiên là số lượng phiên bản, lõi vCPU mà mỗi phiên bản đó có và bộ nhớ phiên bản. Bạn có thể dùng Giao diện người dùng Spark or Chỉ số phiên bản CloudWatch và ghi nhật ký để hiệu chỉnh các giá trị này qua nhiều lần chạy.

Ngoài ra, cài đặt trình điều khiển và trình điều khiển có thể được tối ưu hóa hơn nữa. Để biết ví dụ về cách tính toán những giá trị này, hãy tham khảo Các phương pháp hay nhất để quản lý thành công bộ nhớ cho các ứng dụng Apache Spark trên Amazon EMR.

Tiếp theo, đối với cài đặt trình điều khiển và trình thực thi, chúng tôi khuyên bạn nên điều tra cài đặt trình chuyển giao để cải thiện hiệu suất khi ghi vào Amazon S3. Trong trường hợp của chúng tôi, chúng tôi đang ghi tệp Parquet lên Amazon S3 và cài đặt “spark.sql.parquet.fs.optimized.comitter.optimization-enabled” thành sự thật.

Nếu cần kết nối với Amazon S3, một điểm cuối khu vực “spark.hadoop.fs.s3a.endpoint” có thể được chỉ định trong tệp cấu hình.

Trong đường dẫn ví dụ này, tập lệnh PySpark spark_process.py (như minh họa trong đoạn mã sau) tải tệp CSV từ Amazon S3 vào khung dữ liệu Spark và lưu dữ liệu dưới dạng Parquet trở lại Amazon S3.

Lưu ý rằng cấu hình ví dụ của chúng tôi không tương ứng với khối lượng công việc vì việc đọc và ghi tập dữ liệu bào ngư có thể được thực hiện trên cài đặt mặc định trên một phiên bản. Các cấu hình chúng tôi đã đề cập phải được xác định dựa trên nhu cầu cụ thể của bạn.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Để đi sâu vào việc tối ưu hóa các tác vụ xử lý Spark, bạn có thể sử dụng nhật ký CloudWatch cũng như Giao diện người dùng Spark. Bạn có thể tạo giao diện người dùng Spark bằng cách chạy công việc Đang xử lý trên phiên bản sổ ghi chép SageMaker. Bạn có thể xem Giao diện người dùng Spark cho các công việc Xử lý đang chạy trong một đường ống dẫn by chạy máy chủ lịch sử trong phiên bản sổ ghi chép SageMaker nếu nhật ký giao diện người dùng Spark được lưu trong cùng một vị trí Amazon S3.

Làm sạch

Nếu bạn đã làm theo hướng dẫn, bạn nên xóa các tài nguyên không còn được sử dụng để ngừng phát sinh phí. đảm bảo xóa ngăn xếp CloudFormation mà bạn đã sử dụng để tạo tài nguyên của mình. Thao tác này sẽ xóa ngăn xếp đã tạo cũng như các tài nguyên mà ngăn xếp đó đã tạo.

Kết luận

Trong bài đăng này, chúng tôi đã trình bày cách chạy công việc Xử lý SageMaker an toàn bằng PySpark trong Đường ống SageMaker. Chúng tôi cũng đã trình bày cách tối ưu hóa PySpark bằng cấu hình Spark và thiết lập công việc Xử lý của bạn để chạy trong cấu hình mạng an toàn.

Bước tiếp theo, khám phá cách tự động hóa toàn bộ vòng đời của mô hình và cách khách hàng đã xây dựng nền tảng MLOps an toàn và có thể mở rộng sử dụng dịch vụ SageMaker.


Về các tác giả

Chạy các công việc xử lý an toàn bằng PySpark trong Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Maren Suilmann là Nhà khoa học dữ liệu tại Dịch vụ chuyên nghiệp của AWS. Cô ấy làm việc với khách hàng trong các ngành để tiết lộ sức mạnh của AI/ML để đạt được kết quả kinh doanh của họ. Maren đã làm việc với AWS kể từ tháng 2019 năm XNUMX. Trong thời gian rảnh rỗi, cô ấy thích kickboxing, đi bộ đường dài để ngắm cảnh đẹp và chơi trò chơi board game hàng đêm.


Chạy các công việc xử lý an toàn bằng PySpark trong Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Maira Ladeira Tanke
là Chuyên gia ML tại AWS. Với nền tảng về khoa học dữ liệu, cô ấy có 9 năm kinh nghiệm kiến ​​trúc và xây dựng các ứng dụng ML với khách hàng trong các ngành. Với tư cách là trưởng nhóm kỹ thuật, cô ấy giúp khách hàng đẩy nhanh việc đạt được giá trị kinh doanh thông qua các công nghệ mới nổi và giải pháp sáng tạo. Khi rảnh rỗi, Maira thích đi du lịch và dành thời gian cho gia đình ở một nơi ấm áp.


Chạy các công việc xử lý an toàn bằng PySpark trong Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Pauline Ting
là Nhà khoa học dữ liệu trong Dịch vụ chuyên nghiệp của AWS đội. Cô hỗ trợ khách hàng đạt được và đẩy nhanh kết quả kinh doanh của họ bằng cách phát triển các giải pháp AI/ML. Khi rảnh rỗi, Pauline thích đi du lịch, lướt sóng và thử những địa điểm tráng miệng mới.


Chạy các công việc xử lý an toàn bằng PySpark trong Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Tìm kiếm dọc. Ái.Donald Fossouo
là một Kiến trúc sư dữ liệu Sr trong Dịch vụ chuyên nghiệp của AWS nhóm, chủ yếu làm việc với Global Finance Service. Anh ấy tương tác với khách hàng để tạo ra các giải pháp sáng tạo nhằm giải quyết các vấn đề kinh doanh của khách hàng và đẩy nhanh việc áp dụng các dịch vụ AWS. Khi rảnh rỗi, Donald thích đọc sách, chạy bộ và đi du lịch.

Dấu thời gian:

Thêm từ Học máy AWS