Студия Amazon SageMaker может помочь вам создавать, обучать, отлаживать, развертывать и отслеживать ваши модели, а также управлять рабочими процессами машинного обучения (ML). Конвейеры Amazon SageMaker позволяет построить безопасная, масштабируемая и гибкая платформа MLOps внутри Студии.
В этом посте мы объясним, как запускать задания обработки PySpark в конвейере. Это позволяет любому, кто хочет обучить модель с помощью Pipelines, также выполнять предварительную обработку обучающих данных, постобработку данных логического вывода или оценку моделей с помощью PySpark. Эта возможность особенно актуальна, когда вам нужно обрабатывать большие объемы данных. Кроме того, мы покажем, как оптимизировать шаги PySpark с помощью конфигураций и журналов пользовательского интерфейса Spark.
Трубопроводы – это Создатель мудреца Амазонки инструмент для создания и управления сквозными конвейерами машинного обучения. Это полностью управляемый сервис по запросу, интегрированный с SageMaker и другими сервисами AWS, который создает ресурсы и управляет ими для вас. Это гарантирует, что экземпляры подготавливаются и используются только при запуске конвейеров. Кроме того, Pipelines поддерживается SDK для SageMaker Python, позволяя отслеживать происхождение данных и повторное использование шагов кэшируя их, чтобы сократить время и стоимость разработки. Конвейер SageMaker может использовать этапы обработки для обработки данных или выполнения оценки модели.
При обработке крупномасштабных данных специалисты по данным и инженеры машинного обучения часто используют ПиСпарк, интерфейс для Apache Spark в Питоне. SageMaker предоставляет готовые образы Docker, которые включают PySpark и другие зависимости, необходимые для выполнения заданий по распределенной обработке данных, включая преобразование данных и разработку функций с использованием платформы Spark. Хотя эти образы позволяют быстро начать использовать PySpark для обработки заданий, для крупномасштабной обработки данных часто требуются определенные конфигурации Spark, чтобы оптимизировать распределенные вычисления кластера, созданного SageMaker.
В нашем примере мы создаем конвейер SageMaker, выполняющий один этап обработки. Дополнительные сведения о том, какие еще шаги можно добавить в конвейер, см. Шаги конвейера.
Библиотека обработки SageMaker
SageMaker Processing может работать с определенными каркасы (например, SKlearnProcessor, PySparkProcessor или Hugging Face). Независимо от используемого фреймворка каждый Шаг обработки требуется следующее:
- Имя шага – Имя, которое будет использоваться для шага конвейера SageMaker.
- Аргументы шага - Аргументы для вашего
ProcessingStep
Дополнительно вы можете предоставить следующее:
- Конфигурация вашего кэша шагов, чтобы избежать ненужных запусков вашего шага в конвейере SageMaker.
- Список имен шагов, экземпляров шагов или экземпляров набора шагов, которые
ProcessingStep
зависит от - Отображаемое имя
ProcessingStep
- Описание
ProcessingStep
- Файлы свойств
- Политики повтора
Аргументы переданы в ProcessingStep
, Вы можете использовать sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor класс для запуска приложения Spark внутри задания обработки.
У каждого процессора свои потребности, в зависимости от фреймворка. Лучше всего это иллюстрируется с помощью PySparkProcessor
, где вы можете передать дополнительную информацию для оптимизации ProcessingStep
далее, например, через configuration
параметр при запуске вашего задания.
Запускайте задания SageMaker Processing в безопасной среде.
Это лучшая практика чтобы создать частный Amazon VPC и настроить его так, чтобы ваши задания не были доступны через общедоступный Интернет. Задания SageMaker Processing позволяют указать частные подсети и группы безопасности в вашем VPC, а также включить сетевую изоляцию и шифрование трафика между контейнерами с помощью NetworkConfig.VpcConfig
параметр запроса CreateProcessingJob
API. Мы приводим примеры этой конфигурации, используя SageMaker SDK в следующем разделе.
PySpark ProcessingStep в SageMaker Pipelines
В этом примере мы предполагаем, что Studio развернута в уже доступной безопасной среде, включая VPC, конечные точки VPC, группы безопасности, Управление идентификацией и доступом AWS (IAM) роли и Служба управления ключами AWS (AWS KMS) ключи. Мы также предполагаем, что у вас есть две корзины: одна для артефактов, таких как код и журналы, и одна для ваших данных. Basic_infra.yaml файл содержит пример AWS CloudFormation код для предоставления необходимой предварительной инфраструктуры. Пример кода и руководство по развертыванию также доступны на GitHub.
В качестве примера мы настроили конвейер, содержащий один ProcessingStep
в котором мы просто читаем и пишем набор данных морского ушка с помощью Спарка. В примерах кода показано, как установить и настроить ProcessingStep
.
Мы определяем параметры конвейера (имя, роль, сегменты и т. д.) и настройки для конкретных шагов (тип и количество экземпляров, версию фреймворка и т. д.). В этом примере мы используем безопасную настройку, а также определяем подсети, группы безопасности и шифрование трафика между контейнерами. Для этого примера вам нужна роль выполнения конвейера с полным доступом к SageMaker и VPC. См. следующий код:
{ "pipeline_name": "ProcessingPipeline", "trial": "test-blog-post", "pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:role/<PIPELINE_EXECUTION_ROLE_NAME>", "network_subnet_ids": [ "subnet-<SUBNET_ID>", "subnet-<SUBNET_ID>" ], "network_security_group_ids": [ "sg-<SG_ID>" ], "pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>", "pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py", "spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json", "pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py", "process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}", "pyspark_framework_version": "2.4", "pyspark_process_name": "pyspark-processing", "pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv", "pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output", "pyspark_process_instance_type": "ml.m5.4xlarge", "pyspark_process_instance_count": 6, "tags": { "Project": "tag-for-project", "Owner": "tag-for-owner" }
}
Чтобы продемонстрировать, следующий пример кода запускает сценарий PySpark в SageMaker Processing в конвейере с помощью PySparkProcessor
:
# import code requirements
# standard libraries import
import logging
import json # sagemaker model import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config def create_pipeline(pipeline_params, logger): """ Args: pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters logger (logger): logger Returns: () """ # Create SageMaker Session sagemaker_session = PipelineSession() # Get Tags tags_input = get_tags_input(pipeline_params["tags"]) # get network configuration network_config = get_network_configuration( subnets=pipeline_params["network_subnet_ids"], security_group_ids=pipeline_params["network_security_group_ids"] ) # Get Pipeline Configurations pipeline_config = get_pipeline_config(pipeline_params) # setting processing cache obj logger.info("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days") cache_config = CacheConfig(enable_caching=True, expire_after="p30d") # Create PySpark Processing Step logger.info("Creating " + pipeline_params["pyspark_process_name"] + " processor") # setting up spark processor processing_pyspark_processor = PySparkProcessor( base_job_name=pipeline_params["pyspark_process_name"], framework_version=pipeline_params["pyspark_framework_version"], role=pipeline_params["pipeline_role"], instance_count=pipeline_params["pyspark_process_instance_count"], instance_type=pipeline_params["pyspark_process_instance_type"], volume_kms_key=pipeline_params["pyspark_process_volume_kms"], output_kms_key=pipeline_params["pyspark_process_output_kms"], network_config=network_config, tags=tags_input, sagemaker_session=sagemaker_session ) # setting up arguments run_ags = processing_pyspark_processor.run( submit_app=pipeline_params["pyspark_process_code"], submit_py_files=[pipeline_params["pyspark_helper_code"]], arguments=[ # processing input arguments. To add new arguments to this list you need to provide two entrances: # 1st is the argument name preceded by "--" and the 2nd is the argument value # setting up processing arguments "--input_table", pipeline_params["pyspark_process_data_input"], "--output_table", pipeline_params["pyspark_process_data_output"] ], spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]), inputs = [ ProcessingInput( source=pipeline_params["spark_config_file"], destination="/opt/ml/processing/input/conf", s3_data_type="S3Prefix", s3_input_mode="File", s3_data_distribution_type="FullyReplicated", s3_compression_type="None" ) ], ) # create step pyspark_processing_step = ProcessingStep( name=pipeline_params["pyspark_process_name"], step_args=run_ags, cache_config=cache_config, ) # Create Pipeline pipeline = Pipeline( name=pipeline_params["pipeline_name"], steps=[ pyspark_processing_step ], pipeline_experiment_config=PipelineExperimentConfig( pipeline_params["pipeline_name"], pipeline_config["trial"] ), sagemaker_session=sagemaker_session ) pipeline.upsert( role_arn=pipeline_params["pipeline_role"], description="Example pipeline", tags=tags_input ) return pipeline def main(): # set up logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.info("Get Pipeline Parameter") with open("ml_pipeline/params/pipeline_params.json", "r") as f: pipeline_params = json.load(f) print(pipeline_params) logger.info("Create Pipeline") pipeline = create_pipeline(pipeline_params, logger=logger) logger.info("Execute Pipeline") execution = pipeline.start() return execution if __name__ == "__main__": main()
Как показано в предыдущем коде, мы перезаписываем конфигурации Spark по умолчанию, предоставляя configuration.json
как ProcessingInput
. Мы используем configuration.json
файл, который был сохранен в Простой сервис хранения Amazon (Amazon S3) со следующими настройками:
[ { "Classification":"spark-defaults", "Properties":{ "spark.executor.memory":"10g", "spark.executor.memoryOverhead":"5g", "spark.driver.memory":"10g", "spark.driver.memoryOverhead":"10g", "spark.driver.maxResultSize":"10g", "spark.executor.cores":5, "spark.executor.instances":5, "spark.yarn.maxAppAttempts":1 "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com", "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true } }
]
Мы можем обновить конфигурацию Spark по умолчанию, либо передав файл как ProcessingInput
или с помощью аргумента конфигурации при запуске run()
функции.
Конфигурация Spark зависит от других параметров, таких как тип экземпляра и количество экземпляров, выбранных для задания обработки. Первое соображение — это количество инстансов, количество ядер виртуального ЦП, которое есть у каждого из этих инстансов, и объем памяти инстанса. Вы можете использовать Интерфейсы Spark or Показатели экземпляра CloudWatch и журналы для калибровки этих значений в течение нескольких итераций запуска.
Кроме того, настройки исполнителя и драйвера можно оптимизировать еще больше. Пример того, как их рассчитать, см. Рекомендации по успешному управлению памятью для приложений Apache Spark в Amazon EMR.
Далее, в отношении настроек драйвера и исполнителя, мы рекомендуем изучить настройки коммиттера, чтобы повысить производительность при записи в Amazon S3. В нашем случае мы записываем файлы Parquet в Amazon S3 и устанавливаем «spark.sql.parquet.fs.optimized.comitter.optimization-enabled
»К истине.
Если необходимо для подключения к Amazon S3, региональная конечная точка «spark.hadoop.fs.s3a.endpoint
” можно указать в файле конфигурации.
В этом примере конвейера скрипт PySpark spark_process.py
(как показано в следующем коде) загружает CSV-файл из Amazon S3 во фрейм данных Spark и сохраняет данные как Parquet обратно в Amazon S3.
Обратите внимание, что конфигурация в нашем примере не пропорциональна рабочей нагрузке, потому что чтение и запись набора данных морского ушка могут выполняться с настройками по умолчанию на одном экземпляре. Конфигурации, которые мы упомянули, должны быть определены на основе ваших конкретных потребностей.
# import requirements
import argparse
import logging
import sys
import os
import pandas as pd # spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (udf, col)
from pyspark.sql.types import StringType, StructField, StructType, FloatType from data_utils import( spark_read_parquet, Unbuffered
) sys.stdout = Unbuffered(sys.stdout) # Define custom handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO) def main(data_path): spark = SparkSession.builder.appName("PySparkJob").getOrCreate() spark.sparkContext.setLogLevel("ERROR") schema = StructType( [ StructField("sex", StringType(), True), StructField("length", FloatType(), True), StructField("diameter", FloatType(), True), StructField("height", FloatType(), True), StructField("whole_weight", FloatType(), True), StructField("shucked_weight", FloatType(), True), StructField("viscera_weight", FloatType(), True), StructField("rings", FloatType(), True), ] ) df = spark.read.csv(data_path, header=False, schema=schema) return df.select("sex", "length", "diameter", "rings") if __name__ == "__main__": logger.info(f"===============================================================") logger.info(f"================= Starting pyspark-processing =================") parser = argparse.ArgumentParser(description="app inputs") parser.add_argument("--input_table", type=str, help="path to the channel data") parser.add_argument("--output_table", type=str, help="path to the output data") args = parser.parse_args() df = main(args.input_table) logger.info("Writing transformed data") df.write.csv(os.path.join(args.output_table, "transformed.csv"), header=True, mode="overwrite") # save data df.coalesce(10).write.mode("overwrite").parquet(args.output_table) logger.info(f"================== Ending pyspark-processing ==================") logger.info(f"===============================================================")
Чтобы погрузиться в оптимизацию заданий обработки Spark, вы можете использовать журналы CloudWatch, а также пользовательский интерфейс Spark. Пользовательский интерфейс Spark можно создать, запустив задание обработки в экземпляре записной книжки SageMaker. Вы можете просмотреть Пользовательский интерфейс Spark для заданий обработки, выполняемых в конвейере by запуск сервера истории в экземпляре блокнота SageMaker, если журналы пользовательского интерфейса Spark были сохранены в том же расположении Amazon S3.
Убирать
Если вы следовали руководству, рекомендуется удалить ресурсы, которые больше не используются, чтобы не взимать плату. Убедись в удалить стек CloudFormation которые вы использовали для создания своих ресурсов. Это удалит созданный стек, а также созданные им ресурсы.
Заключение
В этом посте мы показали, как запустить безопасное задание обработки SageMaker с помощью PySpark в SageMaker Pipelines. Мы также продемонстрировали, как оптимизировать PySpark с помощью конфигураций Spark и настроить задание обработки для запуска в защищенной сетевой конфигурации.
В качестве следующего шага изучите, как автоматизировать весь жизненный цикл модели и как клиенты создали безопасные и масштабируемые платформы MLOps с помощью сервисов SageMaker.
Об авторах
Марен Сульманн является специалистом по данным в Профессиональные услуги AWS. Она работает с клиентами из разных отраслей, раскрывая возможности искусственного интеллекта и машинного обучения для достижения их бизнес-результатов. Марен работает в AWS с ноября 2019 года. В свободное время она любит заниматься кикбоксингом, ходить в походы и проводить вечера с настольными играми.
Майра Ладейра Танке является специалистом по машинному обучению в AWS. Имея опыт работы в области науки о данных, она имеет 9-летний опыт разработки и создания приложений машинного обучения для клиентов из разных отраслей. В качестве технического руководителя она помогает клиентам ускорить достижение ценности бизнеса с помощью новых технологий и инновационных решений. В свободное время Майра любит путешествовать и проводить время со своей семьей в теплом месте.
Полин Тинг Data Scientist в Профессиональные услуги AWS команда. Она поддерживает клиентов в достижении и ускорении их бизнес-результатов путем разработки решений AI/ML. В свободное время Полина любит путешествовать, заниматься серфингом и пробовать новые десерты.
Дональд Фоссуо является старшим архитектором данных в Профессиональные услуги AWS команда, в основном работающая с Global Finance Service. Он взаимодействует с клиентами для создания инновационных решений, направленных на решение бизнес-проблем клиентов и ускорение внедрения сервисов AWS. В свободное время Дональд любит читать, бегать и путешествовать.
- SEO-контент и PR-распределение. Получите усиление сегодня.
- Платоблокчейн. Интеллект метавселенной Web3. Расширение знаний. Доступ здесь.
- Источник: https://aws.amazon.com/blogs/machine-learning/run-secure-processing-jobs-using-pyspark-in-amazon-sagemaker-pipelines/
- :является
- $UP
- 1
- 10
- 100
- 2019
- 5G
- 7
- 9
- a
- О нас
- ускорять
- ускоряющий
- доступ
- доступной
- Достигать
- достижение
- достижение
- через
- дополнение
- дополнительный
- Дополнительная информация
- адрес
- Принятие
- AI / ML
- уже
- Несмотря на то, что
- Amazon
- Создатель мудреца Амазонки
- Конвейеры Amazon SageMaker
- и
- кто угодно
- апаш
- API
- приложение
- Применение
- Приложения
- МЫ
- аргумент
- Аргументы
- AS
- At
- автоматизировать
- доступен
- AWS
- назад
- фон
- основанный
- BE
- , так как:
- ЛУЧШЕЕ
- доска
- строить
- строитель
- Строительство
- построенный
- бизнес
- by
- Кэш
- вычислять
- CAN
- случаев
- Канал
- расходы
- выбранный
- класс
- классификация
- Кластер
- код
- лыжных шлемов
- COM
- вычисление
- Конфигурация
- связи
- рассмотрение
- Цена
- может
- Создайте
- создали
- создает
- Создающий
- изготовленный на заказ
- клиент
- Клиенты
- данным
- обработка данных
- наука о данных
- ученый данных
- Дней
- По умолчанию
- определенный
- демонстрировать
- убивают
- зависимый
- в зависимости
- зависит
- развертывание
- развернуть
- развертывание
- описание
- развивающийся
- Развитие
- Дисплей
- распределенный
- распределенных вычислений
- Docker
- водитель
- каждый
- или
- появление
- новые технологии
- включить
- позволяет
- шифрование
- впритык
- Конечная точка
- Проект и
- Инженеры
- обеспечивает
- Весь
- Окружающая среда
- ошибка
- особенно
- оценивать
- оценка
- Даже
- пример
- Примеры
- выполнять
- выполнение
- опыт
- Объяснять
- Больше
- Face
- семья
- Особенность
- Файл
- Файлы
- финансы
- Во-первых,
- гибкого
- следует
- после
- Что касается
- КАДР
- Рамки
- Бесплатно
- от
- FS
- полный
- полностью
- функция
- Функции
- далее
- Более того
- игра
- получить
- Глобальный
- хорошо
- большой
- Группы
- инструкция
- Есть
- высота
- помощь
- помогает
- история
- Как
- How To
- HTML
- HTTP
- HTTPS
- Личность
- изображений
- Импортировать
- импорт
- улучшать
- in
- включают
- В том числе
- независимые
- промышленности
- info
- информация
- Инфраструктура
- инновационный
- вход
- пример
- интегрированный
- Интерфейс
- Интернет
- изоляция
- IT
- итерации
- ЕГО
- работа
- Джобс
- JPG
- JSON
- Основные
- ключи
- крупномасштабный
- вести
- изучение
- Длина
- позволяя
- библиотеки
- Жизненный цикл
- такое как
- Список
- грузы
- расположение
- дольше
- машина
- обучение с помощью машины
- сделать
- управлять
- управляемого
- управление
- управляет
- управления
- Память
- упомянутый
- сообщение
- ML
- млн операций в секунду
- модель
- Модели
- монитор
- БОЛЕЕ
- с разными
- имя
- имена
- необходимо
- Необходимость
- потребности
- сеть
- сетей
- Новые
- следующий
- ноутбук
- Ноябрь
- номер
- of
- on
- On-Demand
- ONE
- Оптимизировать
- оптимизированный
- оптимизирующий
- Опции
- заказ
- OS
- Другое
- Результат
- выходной
- собственный
- владелец
- панд
- параметр
- параметры
- pass
- Прохождение
- путь
- Выполнять
- производительность
- трубопровод
- Мест
- Платон
- Платон Интеллектуальные данные
- ПлатонДанные
- После
- мощностью
- практика
- практиками
- частная
- проблемам
- процесс
- обработка
- процессор
- профессиональный
- Проект
- свойства
- обеспечивать
- приводит
- обеспечение
- обеспечение
- что такое варган?
- Питон
- быстро
- Читать
- Reading
- рекомендовать
- региональный
- соответствующие
- запросить
- Требования
- требуется
- Полезные ресурсы
- возвращают
- Возвращает
- Роли
- роли
- Run
- Бег
- sagemaker
- Конвейеры SageMaker
- то же
- Сохранить
- масштабируемые
- Наука
- Ученый
- Ученые
- Раздел
- безопасный
- безопасность
- обслуживание
- Услуги
- Сессия
- набор
- установка
- настройки
- установка
- Секс
- должен
- показывать
- демонстрации
- показанный
- просто
- просто
- с
- одинарной
- So
- Решения
- Искриться
- специалист
- конкретный
- указанный
- Расходы
- стек
- стандарт
- Начало
- Начало
- Шаг
- Шаги
- Stop
- диск
- студия
- подсеть
- Успешно
- Поддержанный
- Поддержка
- SYS
- команда
- Технический
- технологии
- который
- Ассоциация
- их
- Их
- следовательно
- Эти
- Через
- время
- в
- инструментом
- трек
- трафик
- Train
- Обучение
- преобразований
- преобразован
- Путешествие
- суд
- правда
- учебник
- Типы
- ui
- обнародование
- Обновление ПО
- использование
- ценностное
- Наши ценности
- версия
- с помощью
- Вид
- Просмотры
- теплый
- ЧТО Ж
- Что
- который
- будете
- в
- Рабочие процессы
- работает
- работает
- записывать
- письмо
- YAML
- лет
- Ты
- ВАШЕ
- зефирнет