Виконуйте безпечні завдання обробки за допомогою PySpark у Amazon SageMaker Pipelines

Виконуйте безпечні завдання обробки за допомогою PySpark у Amazon SageMaker Pipelines

Студія Amazon SageMaker може допомогти вам створювати, навчати, налагоджувати, розгортати та контролювати свої моделі та керувати робочими процесами машинного навчання (ML). Трубопроводи Amazon SageMaker дозволяє побудувати a безпечна, масштабована та гнучка платформа MLOps в межах Студії.

У цій публікації ми пояснюємо, як запускати завдання обробки PySpark у конвеєрі. Це дає змогу будь-кому, хто хоче навчити модель за допомогою конвеєрів, також попередньо обробляти навчальні дані, післяобробляти дані висновків або оцінювати моделі за допомогою PySpark. Ця можливість особливо актуальна, коли потрібно обробити великомасштабні дані. Крім того, ми демонструємо, як оптимізувати кроки PySpark за допомогою конфігурацій і журналів інтерфейсу Spark.

Трубопроводи - це Amazon SageMaker інструмент для створення та керування наскрізними конвеєрами машинного навчання. Це повністю керована служба на вимогу, інтегрована з SageMaker та іншими службами AWS, і тому створює ресурси та керує ними для вас. Це гарантує, що екземпляри надаються та використовуються лише під час запуску конвеєрів. Крім того, Pipelines підтримується SageMaker Python SDK, дозволяючи відстежувати ваші родовід даних та кроки повторного використання кешуючи їх, щоб полегшити час і витрати на розробку. Можна використовувати конвеєр SageMaker етапи обробки для обробки даних або виконання оцінки моделі.

Під час обробки великомасштабних даних науковці з даних та інженери ML часто використовують PySpark, інтерфейс для Apache Spark в Python. SageMaker надає готові образи Docker, які включають PySpark та інші залежності, необхідні для виконання розподілених завдань обробки даних, включаючи перетворення даних і розробку функцій за допомогою інфраструктури Spark. Хоча ці зображення дозволяють швидко почати використовувати PySpark у завданнях обробки, велика обробка даних часто вимагає спеціальних конфігурацій Spark, щоб оптимізувати розподілені обчислення кластера, створеного SageMaker.

У нашому прикладі ми створюємо конвеєр SageMaker, який виконує один крок обробки. Додаткову інформацію про те, які ще кроки можна додати до конвеєра, див Кроки трубопроводу.

Бібліотека обробки SageMaker

SageMaker Processing може працювати зі спеціальними каркаси (наприклад, SKlearnProcessor, PySparkProcessor або Hugging Face). Незалежно від використовуваної рамки, кожен Крок обробки вимагає наступного:

  • Назва кроку – Ім’я, яке буде використовуватися для кроку конвеєра SageMaker
  • Покрокові аргументи – Ваші аргументи ProcessingStep

Додатково ви можете надати наступне:

  • Конфігурація вашого кешу кроків, щоб уникнути непотрібних запусків вашого кроку в конвеєрі SageMaker
  • Список імен кроків, екземплярів кроків або екземплярів колекції кроків, які ProcessingStep залежить від
  • Відображуване ім'я ProcessingStep
  • Опис ProcessingStep
  • Файли власності
  • Повторіть політику

Аргументи передані в ProcessingStep. Ви можете використовувати sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor клас для запуску вашої програми Spark всередині завдання обробки.

Кожен процесор має свої власні потреби, залежно від структури. Найкраще це можна проілюструвати за допомогою PySparkProcessor, куди можна передати додаткову інформацію для оптимізації ProcessingStep далі, наприклад, через configuration під час виконання завдання.

Виконуйте завдання обробки SageMaker у безпечному середовищі

Це краща практика щоб створити приватний Amazon VPC і налаштувати його так, щоб ваші завдання не були доступні через загальнодоступний Інтернет. Завдання обробки SageMaker дозволяють указати приватні підмережі та групи безпеки у вашому VPC, а також увімкнути ізоляцію мережі та шифрування трафіку між контейнерами за допомогою NetworkConfig.VpcConfig параметр запиту CreateProcessingJob API. Ми надаємо приклади цієї конфігурації за допомогою SageMaker SDK у наступному розділі.

PySpark ProcessingStep у конвеєрах SageMaker

У цьому прикладі ми припускаємо, що ви розгорнули Studio у вже доступному безпечному середовищі, включаючи VPC, кінцеві точки VPC, групи безпеки, Управління ідентифікацією та доступом AWS (IAM) ролі та Служба управління ключами AWS (AWS KMS). Ми також припускаємо, що у вас є два сегменти: один для артефактів, таких як код і журнали, і інший для ваших даних. The basic_infra.yaml файл містить приклад AWS CloudFormation код для забезпечення необхідної попередньої інфраструктури. Приклад коду та посібник із розгортання також доступні на GitHub.

Як приклад, ми створили конвеєр, що містить один ProcessingStep у якому ми просто читаємо та пишемо набір даних морського вушка за допомогою Spark. Зразки коду показують, як встановити та налаштувати ProcessingStep.

Ми визначаємо параметри для конвеєра (ім’я, роль, сегменти тощо) і параметри для окремих кроків (тип і кількість екземплярів, версія фреймворку тощо). У цьому прикладі ми використовуємо безпечне налаштування, а також визначаємо підмережі, групи безпеки та шифрування трафіку між контейнерами. Для цього прикладу вам потрібна роль конвеєрного виконання з повним доступом SageMaker і VPC. Перегляньте наступний код:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Щоб продемонструвати, наведений нижче приклад коду запускає сценарій PySpark на обробці SageMaker у конвеєрі за допомогою PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Як показано в попередньому коді, ми перезаписуємо стандартні конфігурації Spark, надаючи configuration.json як ProcessingInput. Ми використовуємо a configuration.json файл, який було збережено в Служба простого зберігання Amazon (Amazon S3) із такими налаштуваннями:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Ми можемо оновити конфігурацію Spark за замовчуванням, передавши файл як a ProcessingInput або використовуючи аргумент конфігурації під час запуску run() функції.

Конфігурація Spark залежить від інших параметрів, наприклад типу екземпляра та кількості екземплярів, вибраних для завдання обробки. По-перше, це кількість екземплярів, ядер vCPU, які має кожен із цих екземплярів, і пам’ять екземпляра. Ви можете використовувати Інтерфейси користувача Spark or Показники екземпляра CloudWatch і журнали для калібрування цих значень протягом кількох ітерацій запуску.

Крім того, параметри виконавця та драйвера можна ще більше оптимізувати. Приклад того, як їх обчислити, див Найкращі методи успішного керування пам’яттю для програм Apache Spark на Amazon EMR.

Далі, для налаштувань драйвера та виконавця, ми рекомендуємо дослідити налаштування комітора, щоб покращити продуктивність під час запису в Amazon S3. У нашому випадку ми записуємо файли Parquet на Amazon S3 і встановлюємо «spark.sql.parquet.fs.optimized.comitter.optimization-enabled” на правду.

Якщо потрібно для підключення до Amazon S3, регіональна кінцева точка "spark.hadoop.fs.s3a.endpoint” можна вказати у файлі конфігурацій.

У цьому прикладі конвеєра, сценарій PySpark spark_process.py (як показано в наступному коді) завантажує файл CSV з Amazon S3 у кадр даних Spark і зберігає дані як Parquet назад в Amazon S3.

Зауважте, що конфігурація нашого прикладу не пропорційна робочому навантаженню, оскільки читання та запис набору даних морського вушка можна було б виконати за умовчанням в одному екземплярі. Згадані конфігурації слід визначати на основі ваших конкретних потреб.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Щоб зануритися в оптимізацію завдань обробки Spark, ви можете використовувати журнали CloudWatch, а також інтерфейс користувача Spark. Ви можете створити інтерфейс користувача Spark, запустивши завдання обробки на примірнику блокнота SageMaker. Ви можете переглянути Інтерфейс Spark для завдань обробки, що виконуються в конвеєрі by запуск сервера історії в екземплярі блокнота SageMaker, якщо журнали Spark UI були збережені в тому самому розташуванні Amazon S3.

Прибирати

Якщо ви дотримувалися підручника, радимо видалити ресурси, які більше не використовуються, щоб припинити стягнення плати. Переконайтеся, що видалити стек CloudFormation які ви використовували для створення своїх ресурсів. Це видалить створений стек, а також створені ним ресурси.

Висновок

У цій публікації ми показали, як запустити безпечне завдання SageMaker Processing за допомогою PySpark у SageMaker Pipelines. Ми також продемонстрували, як оптимізувати PySpark за допомогою конфігурацій Spark і налаштувати ваше завдання обробки для виконання в конфігурації безпечної мережі.

Як наступний крок дослідіть, як автоматизувати весь життєвий цикл моделі та яким чином клієнти створили безпечні та масштабовані платформи MLOps за допомогою сервісів SageMaker.


Про авторів

Виконуйте безпечні завдання обробки за допомогою PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Марен Зюльман є спеціалістом із обробки даних у Професійні послуги AWS. Вона працює з клієнтами в різних галузях, відкриваючи можливості ШІ/ML ​​для досягнення результатів у бізнесі. Марен працює в AWS з листопада 2019 року. У вільний час вона любить кікбоксинг, піші прогулянки до чудових краєвидів і вечірні настільні ігри.


Виконуйте безпечні завдання обробки за допомогою PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Майра Ладейра Танке
є спеціалістом з ML в AWS. Маючи досвід роботи з даними, вона має 9-річний досвід проектування та створення програм машинного навчання з клієнтами в різних галузях. Як технічний керівник, вона допомагає клієнтам прискорити досягнення бізнес-цінності за допомогою нових технологій та інноваційних рішень. У вільний час Майра любить подорожувати та проводити час із сім’єю в теплому місці.


Виконуйте безпечні завдання обробки за допомогою PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Полін Тінг
є спеціалістом із обробки даних у Професійні послуги AWS команда. Вона підтримує клієнтів у досягненні та прискоренні бізнес-результатів шляхом розробки рішень AI/ML. У вільний час Поліна любить подорожувати, займатися серфінгом і пробувати нові десертні заклади.


Виконуйте безпечні завдання обробки за допомогою PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальний пошук. Ai.Дональд Фоссуо
є старшим архітектором даних у Професійні послуги AWS команда, яка переважно працює з Global Finance Service. Він співпрацює з клієнтами для створення інноваційних рішень, які вирішують бізнес-проблеми клієнтів і прискорюють впровадження послуг AWS. У вільний час Дональд любить читати, бігати та подорожувати.

Часова мітка:

Більше від AWS Машинне навчання