Jalankan pekerjaan pemrosesan yang aman menggunakan PySpark di Amazon SageMaker Pipelines

Jalankan pekerjaan pemrosesan yang aman menggunakan PySpark di Amazon SageMaker Pipelines

Studio Amazon SageMaker dapat membantu Anda membuat, melatih, men-debug, menerapkan, dan memantau model Anda serta mengelola alur kerja pembelajaran mesin (ML). Pipa Amazon SageMaker memungkinkan Anda untuk membangun platform MLOps yang aman, terukur, dan fleksibel dalam Studio.

Dalam posting ini, kami menjelaskan cara menjalankan pekerjaan pemrosesan PySpark dalam sebuah pipa. Hal ini memungkinkan siapa pun yang ingin melatih model menggunakan Pipelines untuk juga melakukan praproses data pelatihan, data inferensi pascaproses, atau mengevaluasi model menggunakan PySpark. Kemampuan ini sangat relevan saat Anda perlu memproses data berskala besar. Selain itu, kami menunjukkan cara mengoptimalkan langkah-langkah PySpark Anda menggunakan konfigurasi dan log Spark UI.

Pipa adalah suatu Amazon SageMaker alat untuk membuat dan mengelola pipeline ML ujung ke ujung. Ini adalah layanan sesuai permintaan yang dikelola sepenuhnya, terintegrasi dengan SageMaker dan layanan AWS lainnya, sehingga membuat dan mengelola sumber daya untuk Anda. Hal ini memastikan bahwa instans hanya disediakan dan digunakan saat menjalankan pipeline. Selanjutnya, Pipelines didukung oleh SDK Python SageMaker, memungkinkan Anda melacak garis keturunan data dan menggunakan kembali langkah-langkah dengan menyimpannya untuk memudahkan waktu dan biaya pengembangan. Pipeline SageMaker dapat digunakan langkah-langkah pemrosesan memproses data atau melakukan evaluasi model.

Saat memproses data skala besar, ilmuwan data dan insinyur ML sering menggunakannya PySpark, antarmuka untuk Apache Spark dengan Python. SageMaker menyediakan image Docker prebuilt yang menyertakan PySpark dan dependensi lain yang diperlukan untuk menjalankan tugas pemrosesan data terdistribusi, termasuk transformasi data dan rekayasa fitur menggunakan framework Spark. Meskipun gambar tersebut memungkinkan Anda untuk mulai menggunakan PySpark dengan cepat dalam tugas pemrosesan, pemrosesan data skala besar sering kali memerlukan konfigurasi Spark khusus untuk mengoptimalkan komputasi terdistribusi dari klaster yang dibuat oleh SageMaker.

Dalam contoh kami, kami membuat pipeline SageMaker yang menjalankan satu langkah pemrosesan. Untuk informasi lebih lanjut tentang langkah lain apa yang dapat Anda tambahkan ke alur, lihat Langkah Pipa.

Pustaka Pemrosesan SageMaker

Pemrosesan SageMaker dapat berjalan dengan spesifik kerangka (misalnya, SKlearnProcessor, PySparkProcessor, atau Hugging Face). Independen dari kerangka yang digunakan, masing-masing Pemrosesan langkah membutuhkan hal-hal berikut:

  • Nama langkah โ€“ Nama yang akan digunakan untuk langkah pipeline SageMaker Anda
  • Langkah argumen โ€“ Argumen untuk Anda ProcessingStep

Selain itu, Anda dapat memberikan yang berikut ini:

  • Konfigurasi untuk cache langkah Anda untuk menghindari langkah Anda yang tidak perlu dijalankan dalam pipeline SageMaker
  • Daftar nama langkah, contoh langkah, atau contoh kumpulan langkah yang ProcessingStep tergantung pada
  • Nama tampilan dari ProcessingStep
  • Deskripsi dari ProcessingStep
  • File properti
  • Coba lagi kebijakan

Argumentasi diserahkan kepada ProcessingStep. Anda dapat menggunakan sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor kelas untuk menjalankan aplikasi Spark Anda di dalam pekerjaan pemrosesan.

Setiap prosesor hadir dengan kebutuhannya masing-masing, tergantung frameworknya. Ini paling baik diilustrasikan menggunakan PySparkProcessor, tempat Anda dapat memberikan informasi tambahan untuk mengoptimalkan ProcessingStep lebih lanjut, misalnya melalui configuration parameter saat menjalankan pekerjaan Anda.

Jalankan tugas Pemrosesan SageMaker di lingkungan yang aman

Nya praktek terbaik untuk membuat VPC Amazon pribadi dan mengonfigurasinya agar pekerjaan Anda tidak dapat diakses melalui internet publik. Tugas Pemrosesan SageMaker memungkinkan Anda menentukan subnet pribadi dan grup keamanan di VPC Anda serta mengaktifkan isolasi jaringan dan enkripsi lalu lintas antar-wadah menggunakan NetworkConfig.VpcConfig parameter permintaan dari CreateProcessingJob API. Kami memberikan contoh konfigurasi ini menggunakan SDK SageMaker di bagian selanjutnya.

Langkah Pemrosesan PySpark dalam SageMaker Pipelines

Untuk contoh ini, kami berasumsi bahwa Anda telah menerapkan Studio di lingkungan aman yang sudah tersedia, termasuk VPC, titik akhir VPC, grup keamanan, Identitas AWS dan Manajemen Akses (IAM) peran, dan Layanan Manajemen Kunci AWS (AWS KMS) kunci. Kami juga berasumsi bahwa Anda memiliki dua keranjang: satu untuk artefak seperti kode dan log, dan satu lagi untuk data Anda. Itu basic_infra.yaml file memberikan contoh Formasi AWS Cloud kode untuk menyediakan infrastruktur prasyarat yang diperlukan. Kode contoh dan panduan penerapan juga tersedia di GitHub.

Sebagai contoh, kami menyiapkan saluran pipa yang berisi satu ProcessingStep di mana kita hanya membaca dan menulis kumpulan data abalon menggunakan Spark. Contoh kode menunjukkan cara menyiapkan dan mengonfigurasi ProcessingStep.

Kami menentukan parameter untuk pipeline (nama, peran, bucket, dan sebagainya) dan pengaturan spesifik langkah (jenis dan jumlah instance, versi framework, dan sebagainya). Dalam contoh ini, kami menggunakan penyiapan aman dan juga menentukan subnet, grup keamanan, dan enkripsi lalu lintas antar-kontainer. Untuk contoh ini, Anda memerlukan peran eksekusi pipeline dengan akses penuh SageMaker dan VPC. Lihat kode berikut:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Untuk mendemonstrasikan, contoh kode berikut menjalankan skrip PySpark di Pemrosesan SageMaker dalam saluran pipa dengan menggunakan PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Seperti yang ditunjukkan pada kode sebelumnya, kami menimpa konfigurasi Spark default dengan memberikan configuration.json sebagai ProcessingInput. Kami menggunakan a configuration.json file yang disimpan di Layanan Penyimpanan Sederhana Amazon (Amazon S3) dengan pengaturan berikut:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Kami dapat memperbarui konfigurasi Spark default dengan meneruskan file sebagai a ProcessingInput atau dengan menggunakan argumen konfigurasi saat menjalankan run() fungsi.

Konfigurasi Spark bergantung pada opsi lain, seperti jenis instans dan jumlah instans yang dipilih untuk tugas pemrosesan. Pertimbangan pertama adalah jumlah instans, inti vCPU yang dimiliki masing-masing instans tersebut, dan memori instans. Anda dapat gunakan Spark UI or Metrik instans CloudWatch dan log untuk mengkalibrasi nilai-nilai ini selama beberapa iterasi yang dijalankan.

Selain itu, pengaturan eksekutor dan driver dapat dioptimalkan lebih jauh lagi. Untuk contoh cara menghitungnya, lihat Praktik terbaik agar berhasil mengelola memori untuk aplikasi Apache Spark di Amazon EMR.

Selanjutnya, untuk pengaturan driver dan eksekutor, kami menyarankan untuk menyelidiki pengaturan committer guna meningkatkan kinerja saat menulis ke Amazon S3. Dalam kasus kami, kami menulis file Parquet ke Amazon S3 dan menyetel "spark.sql.parquet.fs.optimized.comitter.optimization-enabledโ€ menjadi benar.

Jika diperlukan untuk koneksi ke Amazon S3, titik akhir regional โ€œspark.hadoop.fs.s3a.endpointโ€ dapat ditentukan dalam file konfigurasi.

Dalam contoh pipeline ini, skrip PySpark spark_process.py (seperti yang ditunjukkan dalam kode berikut) memuat file CSV dari Amazon S3 ke dalam bingkai data Spark, dan menyimpan data sebagai Parquet kembali ke Amazon S3.

Perhatikan bahwa konfigurasi contoh kami tidak sebanding dengan beban kerja karena membaca dan menulis kumpulan data abalon dapat dilakukan pada pengaturan default pada satu contoh. Konfigurasi yang kami sebutkan harus ditentukan berdasarkan kebutuhan spesifik Anda.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Untuk mengoptimalkan tugas pemrosesan Spark, Anda dapat menggunakan log CloudWatch serta UI Spark. Anda dapat membuat UI Spark dengan menjalankan tugas Pemrosesan pada instans notebook SageMaker. Anda dapat melihat Spark UI untuk pekerjaan Pemrosesan yang berjalan dalam saluran pipa by menjalankan server sejarah dalam instans notebook SageMaker jika log Spark UI disimpan dalam lokasi Amazon S3 yang sama.

Membersihkan

Jika Anda mengikuti tutorial, praktik yang baik adalah menghapus sumber daya yang tidak lagi digunakan untuk menghentikan pengenaan biaya. Pastikan untuk hapus tumpukan CloudFormation yang Anda gunakan untuk membuat sumber daya. Ini akan menghapus tumpukan yang dibuat serta sumber daya yang dibuatnya.

Kesimpulan

Dalam postingan ini, kami menunjukkan cara menjalankan tugas Pemrosesan SageMaker yang aman menggunakan PySpark di dalam SageMaker Pipelines. Kami juga mendemonstrasikan cara mengoptimalkan PySpark menggunakan konfigurasi Spark dan menyiapkan tugas Pemrosesan Anda untuk dijalankan dalam konfigurasi jaringan yang aman.

Sebagai langkah selanjutnya, jelajahi cara mengotomatiskan seluruh siklus hidup model dan caranya pelanggan membangun platform MLOps yang aman dan dapat diskalakan menggunakan layanan SageMaker.


Tentang Penulis

Jalankan tugas pemrosesan yang aman menggunakan PySpark di Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Maren Suilmann adalah Ilmuwan Data di Layanan Profesional AWS. Dia bekerja dengan pelanggan di berbagai industri untuk mengungkap kekuatan AI/ML untuk mencapai hasil bisnis mereka. Maren telah bergabung dengan AWS sejak November 2019. Di waktu senggangnya, dia menikmati kickboxing, hiking ke pemandangan indah, dan malam permainan papan.


Jalankan tugas pemrosesan yang aman menggunakan PySpark di Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Maira Ladeira Tanke
adalah Spesialis ML di AWS. Dengan latar belakang ilmu data, dia memiliki 9 tahun pengalaman merancang dan membangun aplikasi ML dengan pelanggan di berbagai industri. Sebagai pimpinan teknis, dia membantu pelanggan mempercepat pencapaian nilai bisnis mereka melalui teknologi baru dan solusi inovatif. Di waktu luangnya, Maira senang bepergian dan menghabiskan waktu bersama keluarganya di tempat yang hangat.


Jalankan tugas pemrosesan yang aman menggunakan PySpark di Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Pauline Ting
adalah Ilmuwan Data di Layanan Profesional AWS tim. Dia mendukung pelanggan dalam mencapai dan mempercepat hasil bisnis mereka dengan mengembangkan solusi AI/ML. Di waktu senggangnya, Pauline senang bepergian, berselancar, dan mencoba tempat pencuci mulut baru.


Jalankan tugas pemrosesan yang aman menggunakan PySpark di Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Pencarian Vertikal. Ai.Donald Fossouo
adalah Arsitek Data Sr di Layanan Profesional AWS tim, sebagian besar bekerja dengan Global Finance Service. Dia terlibat dengan pelanggan untuk menciptakan solusi inovatif yang mengatasi masalah bisnis pelanggan dan mempercepat penerapan layanan AWS. Di waktu luangnya, Donald senang membaca, berlari, dan jalan-jalan.

Stempel Waktu:

Lebih dari Pembelajaran Mesin AWS