Amazon SageMaker Pipelines'ta PySpark kullanarak güvenli işleme işleri çalıştırın

Amazon SageMaker Pipelines'ta PySpark kullanarak güvenli işleme işleri çalıştırın

Amazon SageMaker Stüdyosu modellerinizi oluşturmanıza, eğitmenize, hata ayıklamanıza, devreye almanıza ve izlemenize ve makine öğrenimi (ML) iş akışlarınızı yönetmenize yardımcı olabilir. Amazon SageMaker Ardışık Düzenleri inşa etmenizi sağlar güvenli, ölçeklenebilir ve esnek MLOps platformu Studio içinde.

Bu gönderide, PySpark işleme işlerinin bir işlem hattı içinde nasıl çalıştırılacağını açıklıyoruz. Bu, Pipelines kullanarak bir model eğitmek isteyen herkesin eğitim verilerini, işlem sonrası çıkarım verilerini önceden işlemesini veya PySpark kullanarak modelleri değerlendirmesini sağlar. Bu yetenek, özellikle büyük ölçekli verileri işlemeniz gerektiğinde geçerlidir. Ek olarak, konfigürasyonları ve Spark UI günlüklerini kullanarak PySpark adımlarınızı nasıl optimize edeceğinizi gösteriyoruz.

Boru hatları bir Amazon Adaçayı Yapıcı uçtan uca makine öğrenimi ardışık düzenleri oluşturmaya ve yönetmeye yönelik araç. SageMaker ve diğer AWS hizmetleriyle entegre, tamamen yönetilen, isteğe bağlı bir hizmettir ve bu nedenle sizin için kaynakları oluşturur ve yönetir. Bu, örneklerin yalnızca işlem hatları çalıştırılırken sağlanmasını ve kullanılmasını sağlar. Ayrıca, Pipelines tarafından desteklenmektedir SageMaker Python SDK'sı, izlemenizi sağlar veri soyu ve adımları yeniden kullan geliştirme süresini ve maliyetini kolaylaştırmak için bunları önbelleğe alarak. Bir SageMaker ardışık düzeni kullanabilir işlem adımları verileri işlemek veya model değerlendirmesi yapmak için.

Büyük ölçekli verileri işlerken, veri bilimcileri ve makine öğrenimi mühendisleri genellikle PyKıvılcım, için bir arayüz Apache Spark Python'da. SageMaker, veri dönüşümleri ve Spark çerçevesini kullanarak özellik mühendisliği dahil olmak üzere dağıtılmış veri işleme işlerini çalıştırmak için gereken PySpark ve diğer bağımlılıkları içeren önceden oluşturulmuş Docker görüntüleri sağlar. Bu görüntüler, işlerin işlenmesinde PySpark'ı hızlı bir şekilde kullanmaya başlamanıza izin verse de, büyük ölçekli veri işleme, SageMaker tarafından oluşturulan kümenin dağıtılmış hesaplamasını optimize etmek için genellikle belirli Spark yapılandırmaları gerektirir.

Örneğimizde, tek bir işleme adımını çalıştıran bir SageMaker ardışık düzeni oluşturuyoruz. Bir ardışık düzene başka hangi adımları ekleyebileceğiniz hakkında daha fazla bilgi için bkz. Boru Hattı Adımları.

SageMaker İşleme kitaplığı

SageMaker Processing, belirli çerçeveler (örneğin, SKlearnProcessor, PySparkProcessor veya Hugging Face). Kullanılan çerçeveden bağımsız olarak, her İşlemeAdımı aşağıdakileri gerektirir:

  • adım adı – SageMaker işlem hattı adımınız için kullanılacak ad
  • Adım bağımsız değişkenleri – Sizin için argümanlar ProcessingStep

Ek olarak, aşağıdakileri sağlayabilirsiniz:

  • Bir SageMaker ardışık düzeninde adımınızın gereksiz şekilde çalıştırılmasını önlemek için adım önbelleğinizin yapılandırması
  • Adım adlarının, adım örneklerinin veya adım koleksiyonu örneklerinin listesi. ProcessingStep göre değişir
  • Görünen adı ProcessingStep
  • Bir açıklama ProcessingStep
  • Özellik dosyaları
  • Yeniden deneme politikaları

Argümanlar teslim edilir ProcessingStep. Kullanabilirsiniz sagemaker.spark.PySparkİşlemci or sagemaker.spark.SparkJarİşlemci Spark uygulamanızı bir işleme işinin içinde çalıştırmak için sınıf.

Her işlemci, çerçeveye bağlı olarak kendi gereksinimleriyle birlikte gelir. Bu en iyi şekilde şu şekilde gösterilir: PySparkProcessoroptimize etmek için ek bilgiler iletebileceğiniz ProcessingStep ayrıca, örneğin aracılığıyla configuration işinizi çalıştırırken parametre.

SageMaker İşleme işlerini güvenli bir ortamda çalıştırın

Bu kadar En iyi uygulama özel bir Amazon VPC oluşturmak ve işlerinize genel internet üzerinden erişilemeyecek şekilde yapılandırmak için. SageMaker İşleme işleri, VPC'nizdeki özel alt ağları ve güvenlik gruplarını belirtmenize ve ayrıca ağ izolasyonunu ve kapsayıcılar arası trafik şifrelemesini etkinleştirmenize olanak tanır. NetworkConfig.VpcConfig istek parametresi CreateProcessingJob API. Kullanarak bu yapılandırmanın örneklerini sunuyoruz. Adaçayı Yapıcı SDK'sı sonraki bölümde.

PySpark ProcessingSageMaker Pipelines içindeki Adım

Bu örnekte, Studio'nun VPC, VPC uç noktaları, güvenlik grupları, AWS Kimlik ve Erişim Yönetimi (IAM) rolleri ve AWS Anahtar Yönetim Hizmeti (AWS KMS) tuşları. Ayrıca iki klasörünüz olduğunu varsayıyoruz: biri kod ve günlükler gibi yapıtlar için, diğeri verileriniz için. bu basic_infra.yaml dosya örnek sağlar AWS CloudFormation gerekli önkoşul altyapısını sağlamak için kod. Örnek kod ve devreye alma kılavuzu şu adreste de mevcuttur: GitHub.

Örnek olarak, tek bir içeren bir ardışık düzen kurduk. ProcessingStep sadece okuyup yazdığımız deniz kulağı veri kümesi Spark'ı kullanarak. Kod örnekleri, nasıl kurulacağını ve yapılandırılacağını gösterir. ProcessingStep.

Ardışık düzen için parametreler (ad, rol, kovalar vb.) ve adıma özel ayarlar (örnek türü ve sayısı, çerçeve sürümü vb.) tanımlarız. Bu örnekte, güvenli bir kurulum kullanıyoruz ve ayrıca alt ağları, güvenlik gruplarını ve kapsayıcılar arası trafik şifrelemesini tanımlıyoruz. Bu örnek için, SageMaker tam erişimi ve bir VPC ile bir boru hattı yürütme rolüne ihtiyacınız var. Aşağıdaki koda bakın:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Göstermek için, aşağıdaki kod örneği, SageMaker Processing üzerinde bir işlem hattı içinde bir PySpark betiği çalıştırır. PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Önceki kodda gösterildiği gibi, sağlayarak varsayılan Spark yapılandırmalarının üzerine yazıyoruz. configuration.json bir şekilde ProcessingInput. biz bir configuration.json kaydedilen dosya Amazon Basit Depolama Hizmeti (Amazon S3) aşağıdaki ayarlarla:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Varsayılan Spark yapılandırmasını, dosyayı bir dosya olarak geçirerek güncelleyebiliriz. ProcessingInput veya çalıştırırken yapılandırma bağımsız değişkenini kullanarak run() fonksiyonu.

Spark yapılandırması, işleme işi için seçilen örnek türü ve örnek sayısı gibi diğer seçeneklere bağlıdır. İlk husus, bulut sunucusu sayısı, bu örneklerin her birinin sahip olduğu vCPU çekirdekleri ve bulut sunucusu belleğidir. Kullanabilirsiniz Kıvılcım kullanıcı arayüzleri or CloudWatch örnek ölçümleri ve bu değerleri birden çok çalıştırma yinelemesinde kalibre etmek için günlükler.

Ayrıca yürütücü ve sürücü ayarları daha da optimize edilebilir. Bunların nasıl hesaplanacağına ilişkin bir örnek için bkz. Amazon EMR'de Apache Spark uygulamaları için belleği başarıyla yönetmeye yönelik en iyi uygulamalar.

Daha sonra, sürücü ve yürütücü ayarları için, Amazon S3'e yazarken performansı artırmak amacıyla işlemci ayarlarını incelemenizi öneririz. Bizim durumumuzda, Parke dosyalarını Amazon S3'e yazıyoruz ve “spark.sql.parquet.fs.optimized.comitter.optimization-enabled"Doğru.

Amazon S3'e bağlantı için gerekirse, bölgesel bir uç nokta “spark.hadoop.fs.s3a.endpoint” yapılandırma dosyası içinde belirtilebilir.

Bu örnek ardışık düzende, PySpark betiği spark_process.py (aşağıdaki kodda gösterildiği gibi), Amazon S3'ten bir Spark veri çerçevesine bir CSV dosyası yükler ve verileri Parquet olarak Amazon S3'e geri kaydeder.

Abalone veri kümesinin okunması ve yazılması bir örnekteki varsayılan ayarlarda yapılabileceğinden, örnek yapılandırmamızın iş yüküyle orantılı olmadığına dikkat edin. Bahsettiğimiz konfigürasyonlar, özel ihtiyaçlarınıza göre tanımlanmalıdır.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Spark işleme işlerini optimize etmeye dalmak için CloudWatch günlüklerini ve Spark kullanıcı arayüzünü kullanabilirsiniz. Bir SageMaker not defteri örneğinde bir İşleme işi çalıştırarak Spark kullanıcı arayüzünü oluşturabilirsiniz. görüntüleyebilirsiniz Bir ardışık düzende çalışan İşleme işleri için Spark UI by geçmiş sunucusunu çalıştırma Spark UI günlükleri aynı Amazon S3 konumuna kaydedilmişse bir SageMaker not defteri eşgörünümü içinde.

Temizlemek

Öğreticiyi izlediyseniz, artık kullanılmayan kaynakları silmek, ücretlendirmeyi durdurmak için iyi bir uygulamadır. Emin ol CloudFormation yığınını silin kaynaklarınızı oluşturmak için kullandığınız Bu, oluşturulan yığının yanı sıra oluşturduğu kaynakları da siler.

Sonuç

Bu gönderide, SageMaker Pipelines içinde PySpark kullanarak güvenli bir SageMaker Processing işinin nasıl çalıştırılacağını gösterdik. Ayrıca, Spark yapılandırmalarını kullanarak PySpark'ı nasıl optimize edeceğinizi ve İşleme işinizi güvenli bir ağ yapılandırmasında çalışacak şekilde nasıl kuracağınızı da gösterdik.

Bir sonraki adım olarak, tüm model yaşam döngüsünün nasıl otomatikleştirileceğini ve müşteriler güvenli ve ölçeklenebilir MLOps platformları oluşturdu SageMaker hizmetlerini kullanarak.


Yazarlar Hakkında

Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maren Suilmann şirketinde Veri Bilimcisi AWS Profesyonel Hizmetleri. İş sonuçlarına ulaşmak için AI/ML'nin gücünü ortaya çıkaran sektörlerdeki müşterilerle birlikte çalışır. Maren, Kasım 2019'dan beri AWS'de çalışıyor. Boş zamanlarında kickboks yapmaktan, harika manzaralara yürüyüş yapmaktan ve masa oyunu gecelerinden hoşlanıyor.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Maira Ladeira Tankı
AWS'de bir Makine Öğrenimi Uzmanıdır. Veri bilimi geçmişiyle, farklı sektörlerden müşterilerle makine öğrenimi uygulamaları tasarlama ve oluşturma konusunda 9 yıllık deneyime sahiptir. Teknik lider olarak, gelişen teknolojiler ve yenilikçi çözümler aracılığıyla müşterilerin iş değerine ulaşmalarını hızlandırmalarına yardımcı olur. Maira, boş zamanlarında seyahat etmeyi ve ailesiyle sıcak bir yerde vakit geçirmeyi seviyor.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Pauline Ting
Veri Bilimcisi AWS Profesyonel Hizmetleri takım. AI/ML çözümleri geliştirerek müşterilerin iş sonuçlarına ulaşmalarını ve bunları hızlandırmalarını destekliyor. Pauline boş zamanlarında seyahat etmeyi, sörf yapmayı ve yeni tatlı mekanları denemeyi seviyor.


Run secure processing jobs using PySpark in Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Vertical Search. Ai.Donald Fossouo
Kıdemli Veri Mimarı AWS Profesyonel Hizmetleri çoğunlukla Global Finance Service ile çalışan ekip. Müşterilerin iş sorunlarını ele alan ve AWS hizmetlerinin benimsenmesini hızlandıran yenilikçi çözümler oluşturmak için müşterilerle iletişim kurar. Donald boş zamanlarında kitap okumaktan, koşmaktan ve seyahat etmekten hoşlanır.

Zaman Damgası:

Den fazla AWS Makine Öğrenimi