Выполнение безопасных заданий обработки с помощью PySpark в Amazon SageMaker Pipelines

Выполнение безопасных заданий обработки с помощью PySpark в Amazon SageMaker Pipelines

Студия Amazon SageMaker может помочь вам создавать, обучать, отлаживать, развертывать и отслеживать ваши модели, а также управлять рабочими процессами машинного обучения (ML). Конвейеры Amazon SageMaker позволяет построить безопасная, масштабируемая и гибкая платформа MLOps внутри Студии.

В этом посте мы объясним, как запускать задания обработки PySpark в конвейере. Это позволяет любому, кто хочет обучить модель с помощью Pipelines, также выполнять предварительную обработку обучающих данных, постобработку данных логического вывода или оценку моделей с помощью PySpark. Эта возможность особенно актуальна, когда вам нужно обрабатывать большие объемы данных. Кроме того, мы покажем, как оптимизировать шаги PySpark с помощью конфигураций и журналов пользовательского интерфейса Spark.

Трубопроводы – это Создатель мудреца Амазонки инструмент для создания и управления сквозными конвейерами машинного обучения. Это полностью управляемый сервис по запросу, интегрированный с SageMaker и другими сервисами AWS, который создает ресурсы и управляет ими для вас. Это гарантирует, что экземпляры подготавливаются и используются только при запуске конвейеров. Кроме того, Pipelines поддерживается SDK для SageMaker Python, позволяя отслеживать происхождение данных и повторное использование шагов кэшируя их, чтобы сократить время и стоимость разработки. Конвейер SageMaker может использовать этапы обработки для обработки данных или выполнения оценки модели.

При обработке крупномасштабных данных специалисты по данным и инженеры машинного обучения часто используют ПиСпарк, интерфейс для Apache Spark в Питоне. SageMaker предоставляет готовые образы Docker, которые включают PySpark и другие зависимости, необходимые для выполнения заданий по распределенной обработке данных, включая преобразование данных и разработку функций с использованием платформы Spark. Хотя эти образы позволяют быстро начать использовать PySpark для обработки заданий, для крупномасштабной обработки данных часто требуются определенные конфигурации Spark, чтобы оптимизировать распределенные вычисления кластера, созданного SageMaker.

В нашем примере мы создаем конвейер SageMaker, выполняющий один этап обработки. Дополнительные сведения о том, какие еще шаги можно добавить в конвейер, см. Шаги конвейера.

Библиотека обработки SageMaker

SageMaker Processing может работать с определенными каркасы (например, SKlearnProcessor, PySparkProcessor или Hugging Face). Независимо от используемого фреймворка каждый Шаг обработки требуется следующее:

  • Имя шага – Имя, которое будет использоваться для шага конвейера SageMaker.
  • Аргументы шага - Аргументы для вашего ProcessingStep

Дополнительно вы можете предоставить следующее:

  • Конфигурация вашего кэша шагов, чтобы избежать ненужных запусков вашего шага в конвейере SageMaker.
  • Список имен шагов, экземпляров шагов или экземпляров набора шагов, которые ProcessingStep зависит от
  • Отображаемое имя ProcessingStep
  • Описание ProcessingStep
  • Файлы свойств
  • Политики повтора

Аргументы переданы в ProcessingStep, Вы можете использовать sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor класс для запуска приложения Spark внутри задания обработки.

У каждого процессора свои потребности, в зависимости от фреймворка. Лучше всего это иллюстрируется с помощью PySparkProcessor, где вы можете передать дополнительную информацию для оптимизации ProcessingStep далее, например, через configuration параметр при запуске вашего задания.

Запускайте задания SageMaker Processing в безопасной среде.

Это лучшая практика чтобы создать частный Amazon VPC и настроить его так, чтобы ваши задания не были доступны через общедоступный Интернет. Задания SageMaker Processing позволяют указать частные подсети и группы безопасности в вашем VPC, а также включить сетевую изоляцию и шифрование трафика между контейнерами с помощью NetworkConfig.VpcConfig параметр запроса CreateProcessingJob API. Мы приводим примеры этой конфигурации, используя SageMaker SDK в следующем разделе.

PySpark ProcessingStep в SageMaker Pipelines

В этом примере мы предполагаем, что Studio развернута в уже доступной безопасной среде, включая VPC, конечные точки VPC, группы безопасности, Управление идентификацией и доступом AWS (IAM) роли и Служба управления ключами AWS (AWS KMS) ключи. Мы также предполагаем, что у вас есть две корзины: одна для артефактов, таких как код и журналы, и одна для ваших данных. Basic_infra.yaml файл содержит пример AWS CloudFormation код для предоставления необходимой предварительной инфраструктуры. Пример кода и руководство по развертыванию также доступны на GitHub.

В качестве примера мы настроили конвейер, содержащий один ProcessingStep в котором мы просто читаем и пишем набор данных морского ушка с помощью Спарка. В примерах кода показано, как установить и настроить ProcessingStep.

Мы определяем параметры конвейера (имя, роль, сегменты и т. д.) и настройки для конкретных шагов (тип и количество экземпляров, версию фреймворка и т. д.). В этом примере мы используем безопасную настройку, а также определяем подсети, группы безопасности и шифрование трафика между контейнерами. Для этого примера вам нужна роль выполнения конвейера с полным доступом к SageMaker и VPC. См. следующий код:

{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}

Чтобы продемонстрировать, следующий пример кода запускает сценарий PySpark в SageMaker Processing в конвейере с помощью PySparkProcessor:

# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()

Как показано в предыдущем коде, мы перезаписываем конфигурации Spark по умолчанию, предоставляя configuration.json как ProcessingInput. Мы используем configuration.json файл, который был сохранен в Простой сервис хранения Amazon (Amazon S3) со следующими настройками:

[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]

Мы можем обновить конфигурацию Spark по умолчанию, либо передав файл как ProcessingInput или с помощью аргумента конфигурации при запуске run() функции.

Конфигурация Spark зависит от других параметров, таких как тип экземпляра и количество экземпляров, выбранных для задания обработки. Первое соображение — это количество инстансов, количество ядер виртуального ЦП, которое есть у каждого из этих инстансов, и объем памяти инстанса. Вы можете использовать Интерфейсы Spark or Показатели экземпляра CloudWatch и журналы для калибровки этих значений в течение нескольких итераций запуска.

Кроме того, настройки исполнителя и драйвера можно оптимизировать еще больше. Пример того, как их рассчитать, см. Рекомендации по успешному управлению памятью для приложений Apache Spark в Amazon EMR.

Далее, в отношении настроек драйвера и исполнителя, мы рекомендуем изучить настройки коммиттера, чтобы повысить производительность при записи в Amazon S3. В нашем случае мы записываем файлы Parquet в Amazon S3 и устанавливаем «spark.sql.parquet.fs.optimized.comitter.optimization-enabled»К истине.

Если необходимо для подключения к Amazon S3, региональная конечная точка «spark.hadoop.fs.s3a.endpoint” можно указать в файле конфигурации.

В этом примере конвейера скрипт PySpark spark_process.py (как показано в следующем коде) загружает CSV-файл из Amazon S3 во фрейм данных Spark и сохраняет данные как Parquet обратно в Amazon S3.

Обратите внимание, что конфигурация в нашем примере не пропорциональна рабочей нагрузке, потому что чтение и запись набора данных морского ушка могут выполняться с настройками по умолчанию на одном экземпляре. Конфигурации, которые мы упомянули, должны быть определены на основе ваших конкретных потребностей.

# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")

Чтобы погрузиться в оптимизацию заданий обработки Spark, вы можете использовать журналы CloudWatch, а также пользовательский интерфейс Spark. Пользовательский интерфейс Spark можно создать, запустив задание обработки в экземпляре записной книжки SageMaker. Вы можете просмотреть Пользовательский интерфейс Spark для заданий обработки, выполняемых в конвейере by запуск сервера истории в экземпляре блокнота SageMaker, если журналы пользовательского интерфейса Spark были сохранены в том же расположении Amazon S3.

Убирать

Если вы следовали руководству, рекомендуется удалить ресурсы, которые больше не используются, чтобы не взимать плату. Убедись в удалить стек CloudFormation которые вы использовали для создания своих ресурсов. Это удалит созданный стек, а также созданные им ресурсы.

Заключение

В этом посте мы показали, как запустить безопасное задание обработки SageMaker с помощью PySpark в SageMaker Pipelines. Мы также продемонстрировали, как оптимизировать PySpark с помощью конфигураций Spark и настроить задание обработки для запуска в защищенной сетевой конфигурации.

В качестве следующего шага изучите, как автоматизировать весь жизненный цикл модели и как клиенты создали безопасные и масштабируемые платформы MLOps с помощью сервисов SageMaker.


Об авторах

Выполняйте задания безопасной обработки с помощью PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Марен Сульманн является специалистом по данным в Профессиональные услуги AWS. Она работает с клиентами из разных отраслей, раскрывая возможности искусственного интеллекта и машинного обучения для достижения их бизнес-результатов. Марен работает в AWS с ноября 2019 года. В свободное время она любит заниматься кикбоксингом, ходить в походы и проводить вечера с настольными играми.


Выполняйте задания безопасной обработки с помощью PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Майра Ладейра Танке
является специалистом по машинному обучению в AWS. Имея опыт работы в области науки о данных, она имеет 9-летний опыт разработки и создания приложений машинного обучения для клиентов из разных отраслей. В качестве технического руководителя она помогает клиентам ускорить достижение ценности бизнеса с помощью новых технологий и инновационных решений. В свободное время Майра любит путешествовать и проводить время со своей семьей в теплом месте.


Выполняйте задания безопасной обработки с помощью PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Полин Тинг
Data Scientist в Профессиональные услуги AWS команда. Она поддерживает клиентов в достижении и ускорении их бизнес-результатов путем разработки решений AI/ML. В свободное время Полина любит путешествовать, заниматься серфингом и пробовать новые десерты.


Выполняйте задания безопасной обработки с помощью PySpark в Amazon SageMaker Pipelines PlatoBlockchain Data Intelligence. Вертикальный поиск. Ай.Дональд Фоссуо
является старшим архитектором данных в Профессиональные услуги AWS команда, в основном работающая с Global Finance Service. Он взаимодействует с клиентами для создания инновационных решений, направленных на решение бизнес-проблем клиентов и ускорение внедрения сервисов AWS. В свободное время Дональд любит читать, бегать и путешествовать.

Отметка времени:

Больше от Машинное обучение AWS