Быстрый взгляд на структурированную потоковую передачу Spark + Kafka

Изучение основ использования этого мощного дуэта для задач потоковой обработки.

Фото Никита Сингхал on Unsplash

Недавно я начал много изучать Apache Kafka и Apache Spark, две ведущие технологии в мире обработки данных.

За последние несколько месяцев я сделал несколько проектов с их использованием; «Потоковая передача машинного обучения с помощью Kafka, Debezium и BentoML» — пример. Моя цель — научиться создавать мощные конвейеры данных с помощью этих современных известных инструментов и понять их преимущества и недостатки.

В последние месяцы я уже рассказывал, как создавать конвейеры ETL, используя оба инструмента, но никогда не используя их вместе, и этот пробел я восполню сегодня.

Наша цель — изучить общую идею создания потокового приложения с помощью Spark+Kafka и быстро рассмотреть его основные концепции с использованием реальных данных.

Идея проста: Apache Kafka — это инструмент потоковой передачи сообщений, в котором производители записывают сообщения на одном конце очереди (называемой тема) для чтения потребителями с другой стороны.

Но это очень сложный инструмент, созданный как отказоустойчивая распределенная служба обмена сообщениями со всеми видами гарантий доставки (точно один раз, один раз, любое), хранилищем и репликацией сообщений, а также обеспечивающий гибкость, масштабируемость и высокую пропускную способность. Он имеет более широкий набор вариантов использования, таких как связь с микросервисами, системы событий в реальном времени и потоковые конвейеры ETL.

Apache Spark — это механизм преобразования данных на основе распределенной памяти.

Это также очень сложный инструмент, способный подключаться ко всем видам баз данных, файловым системам и облачной инфраструктуре. Он предназначен для работы в распределенных средах для распараллеливания обработки между машинами, достижения высокопроизводительных преобразований за счет использования философии ленивых вычислений и оптимизации запросов.

Самое интересное в этом то, что к концу дня код представляет собой просто ваш обычный SQL-запрос или (почти) ваш скрипт Python + pandas, со всем колдовством, абстрагированным под красивым, удобным для пользователя высокоуровневым API.

Объедините эти две технологии, и у нас появится идеальное решение для создания потокового конвейера ETL.

Мы будем использовать данные датчиков дорожного движения в городе Белу-Оризонти (БГ), столице штата Минас-Жерайс (Бразилия). Это огромный набор данных, содержащий измерения транспортных потоков в нескольких местах города. Каждый датчик периодически определяет тип транспортного средства, движущегося в этом месте (автомобиль, мотоцикл, автобус/грузовик), его скорость и длину (а также другую информацию, которую мы не собираемся использовать).

Этот набор данных представляет собой одно из классических приложений для потоковых систем — группу датчиков, непрерывно отправляющих свои показания с места.

В этом сценарии Apache Kafka можно использовать в качестве уровня абстракции между датчиками и приложениями, потребляющими их данные.

Kafka используется как уровень абстракции между источниками и сервисами. Изображение автора.

Имея такую ​​инфраструктуру, можно строить всевозможные (так называемые) системы реального времени, управляемые событиями, как программа для обнаружения и оповещения о пробках, когда количество транспортных средств внезапно увеличивается при падении средней скорости.

И здесь в игру вступает Apache Spark.

Он имеет собственный модуль для потоковой обработки под названием Структурированная потоковая передача Spark, который может подключаться к Kafka и обрабатывать его сообщения.

Настройка среды

Все, что вам нужно, это docker и docker-compose.

Мы будем использовать конфигурацию файла docker-compose на основе следующих репозиториев: ссылка искра, ссылка Кафка.

Ассоциация ./источник Volume — это место, куда мы собираемся поместить наши скрипты.

Чтобы запустить среду, просто запустите

сбор докеров

Весь код доступен здесь Репозиторий GitHub.

Когда я начал изучать Spark, мне больше всего понравилось сходство написанного для него кода и моих обычных скриптов на Python+Pandas. Мигрировать было очень легко.

Следуя той же логике, потоковый модуль Spark очень похож на обычный спарк-код, что позволяет легко мигрировать с пакетных приложений на потоковые.

С учетом сказанного, в следующих разделах мы сосредоточимся на изучении особенностей структурированной потоковой передачи Spark, т. е. на том, какие новые функции она имеет.

Наша первая работа

Давайте начнем медленно и создадим игрушечный пример.

Первое, что нужно сделать, — это создать тему Kafka, из которой наше искровое задание будет получать сообщения.

Это сделано доступ к контейнерному терминалу Кафка и выполнение:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Чтобы смоделировать, как производитель пишет сообщения на эту тему, воспользуемся командой kafka-консоль-производитель. Также внутри контейнера:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Теперь каждая строка, набранная в терминале, будет отправляться как сообщение в тестовую тему. Символ «:» используется для разделения ключа и значения сообщения (ключ:значение).

Давайте создадим задание Spark, чтобы использовать эту тему.

Код необходимо поместить внутрь /src/потоковая передача папка (ничего особенного, просто папка, которую я выбрал).

Главное, что следует отметить, это то, что мы используем атрибуты readStream и записьПоток, вместо обычного чтения и записи. Это основной аспект, который заставляет Spark относиться к нашей работе как к потоковому приложению.

Для подключения к Кафке необходимо указать сервер и тему. Опция startOffsets="самый ранний» говорит Спарку прочитать тему с самого начала. Кроме того, поскольку Kafka хранит свои сообщения в двоичный форму, их необходимо декодировать в string.

Другие варианты будут дополнительно изучены.

Теперь давайте получим доступ к контейнеру Spark и запустим задание.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

После нескольких секунд настройки он начнет использовать тему.

Искра потребляет сообщения от Кафки. Изображение автора.

Spark Streaming работает в микродозирование режиме, и именно поэтому мы видим «пакетную» информацию, когда она потребляет сообщения.

Микропакетная обработка — это что-то среднее между полной «настоящей» потоковой передачей, когда все сообщения обрабатываются индивидуально по мере их поступления, и обычной пакетной передачей, где данные остаются статичными и используются по требованию. Spark будет ждать некоторое время, пытаясь накопить сообщения для их совместной обработки, сокращая накладные расходы и увеличивая задержку. Это можно настроить в соответствии с вашими потребностями.

Я не очень быстро печатаю, поэтому Spark обрабатывает сообщение, прежде чем я смогу включить новые в текущий пакет.

И это была наша первая стриминговая работа!

Надеюсь, у вас возникло ощущение: написать код для обработки потока несложно, но есть некоторые ошибки.

Запись данных в поток Kafka

Теперь пришло время начать играть с данными датчиков.

Вы можете скачать застежка-молния файл за АВГУСТ 2022 ГОДА и извлеките его в архив. /данные объем. Данные изначально хранятся в формате JSON и занимают около 23 ГБ места. Первое, что нужно сделать, это преобразовать его в паркет, чтобы оптимизировать дисковое пространство и время чтения.

Задания искры для этого подробно описаны в репозитории GitHub, все, что вам нужно сделать, это выполнить их:

искровая отправка /src/transform_json_to_parquet.pyискровая отправка /src/join_parquet_files.py

В зависимости от вашей машины выполнение может занять некоторое время. Но это окупается: окончательный размер файла паркета составляет ~ 1 ГБ (более чем в 20 раз меньше) и читается намного быстрее.

Нам также необходимо создать тему Kafka для получения наших сообщений:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic Traffic_sensor

При желании, если вы хотите отображать поступающие сообщения, можно настроить консольного потребителя.

kafka-console-consumer.sh --topic Traffic_sensor --bootstrap-server localhost:9092

Записывать данные по теме Kafka легко, но есть некоторые детали.

В структурированной потоковой передаче поведение по умолчанию — не пытаться определить схему данных (столбцы и их типы), поэтому нам нужно передать ее.

Сообщения Kafka представляют собой просто пары двоичных строк «ключ-значение», поэтому нам необходимо представлять наши данные в этом формате. Этого можно легко добиться, преобразовав все строки в строки JSON, закодировав их в двоичном формате и сохранив результат в столбце «значение».

Преобразование столбцов в строки JSON. Изображение автора.

Ключи сообщений очень важны в Kafka, но в наших тестах они не пригодятся, поэтому все сообщения будут одинаковыми.

Как упоминалось ранее, этот набор данных ОГРОМНЫЙ, поэтому я ограничил количество вставленных сообщений до 500,000 XNUMX.

Наконец, мы передаем сервер и тему Kafka и «контрольно-пропускной пунктМестоположение», где искра будет хранить ход выполнения, что полезно для восстановления после ошибок.

Выполнение задания:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Вставка данных в Кафку. Изображение автора.

Слева задание Spark считывает файл, справа — Kafka-консоль-потребитель отображает поступающие сообщения.

Наша тема трафика заполнена и почти готова к обработке.

Важно помнить, что мы использовали искровую работу для заполнения нашей темы только в учебных целях. В реальном сценарии датчики сами отправят показания непосредственно в Kafka.

Чтобы имитировать такое динамическое поведение, приведенный ниже сценарий записывает в тему 1 строку каждые 2.5 секунды.

Режимы вывода — Подсчет количества транспортных средств по типам

Двигаясь дальше, давайте создадим задание для подсчета количества транспортных средств по типам.

Столбец «Classificação» (Классификация) содержит обнаруженный тип транспортного средства.

Поскольку мы читаем эту тему, нам нужно преобразовать двоичные строки JSON обратно в столбчатый формат.

Как только это будет сделано, запрос можно будет построить как обычно. Интересно отметить, что сердцевина запроса — это всего лишь выберите().группа по().считать(), все остальное относится к логике потоковой передачи.

Итак, пришло время обратиться к режим вывода() вариант.

Режим вывода потокового приложения определяет, как мы хотим (пере) вычислять и записывать результаты по мере поступления новых данных.

Он может принимать три разных значения:

  • присоединять: Добавлять в выходные данные только новые записи.
  • Завершенный: пересчитывать полный результат для каждой новой записи.
  • Обновление ПО: Обновить измененные записи.

Эти режимы могут иметь или не иметь смысла в зависимости от написанного приложения. Например, «полный» режим может не иметь смысла, если выполняется какая-либо группировка или сортировка.

Давайте выполним задание в «завершенном» режиме и посмотрим на результаты.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Каминьян — Грузовик, Автомобиль-Автомобиль, Неопределенный-Неопределенный, Автобус-Онибус, Мотоцикл-Мотоцикл. Изображение автора.

Когда в поток вставляются новые записи (см. терминал справа), задание пересчитывает полный результат. Это может быть полезно в ситуациях, когда порядок строк важен, например, в рейтинге или конкуренции.

Однако этот подход может оказаться неоптимальным, если количество групп слишком велико или отдельные изменения не влияют на общий результат.

Итак, другой вариант — использовать режим вывода «обновление», который генерирует новое сообщение только для тех групп, которые изменились. См. ниже:

Запрос с режимом вывода «обновление». Изображение автора.

Режим «добавить» недоступен для запросов с группировкой, поэтому показать с использованием одного и того же задания я не смогу. Но я думаю, что это самый простой режим, он всегда добавляет новую запись в вывод.

Эти режимы вывода проще понять, если вы думаете о сохранении результатов в таблице. В режиме полного вывода таблица будет перезаписываться для каждого нового обработанного сообщения, в режиме обновления — только те строки, в которых произошло какое-то обновление, а добавление всегда будет добавлять новую строку в конец.

Переворачивающееся временное окно — агрегирование с использованием временных интервалов

В потоковых системах сообщения имеют две разные временные метки, связанные с ними: время события — время создания сообщения, в нашем случае время чтения датчика, и время обработки — когда сообщение прочитано агентом обработки, в нашем случае, когда он достигает Искры.

Важной особенностью инструментов потоковой обработки является возможность обработки времени события. Переворачивающиеся окна — это непересекающиеся фиксированные интервалы времени, используемые для создания агрегатов с использованием столбцов времени событий. Проще говоря, они делят временную шкалу на кусочки одинакового размера, так что каждое событие принадлежит одному интервалу.

Например, подсчитайте каждые 5 минут, сколько транспортных средств было обнаружено за последние 5 минут.

5-минутное падающее окно. Изображение автора.

Код ниже иллюстрирует это:

Этот вид обработки может быть чрезвычайно полезен во многих ситуациях. Возвращаясь к предложенному ранее детектору пробок, один из возможных подходов — измерить среднюю скорость транспортных средств за 10-минутный интервал и посмотреть, не ниже ли она определенного порога.

Обработка событий во время — сложная тема. При работе с ним может случиться что угодно: например, сообщения теряются, приходят слишком поздно или выходят из строя. В Spark есть несколько механизмов, позволяющих решить эти проблемы, например водяные знаки, на этом мы не будем заострять внимание.

Временные окна также можно использовать в сочетании с другими столбцами в таблице. группа по(). В приведенном ниже примере подсчитывается количество транспортных средств по типам в 5-минутном окне.

Скользящее временное окно — Гибкость временных интервалов

Скользящие временные окна — это гибкий вариант меняющихся окон. Вместо создания непересекающихся интервалов они позволяют определить, как часто будет создаваться каждый интервал.

Например, каждые 5 минут подсчитывайте, сколько транспортных средств было обнаружено за последние 30 минут.

Благодаря этому события могут принадлежать множеству интервалов и учитываться столько раз, сколько необходимо.

Чтобы определить скользящее окно, просто передайте интервал обновления в окно() функция.

Давайте посмотрим результат.

Как мы видим, у нас есть 30-минутные окна, создаваемые каждые 5 минут.

Эта гибкость может быть весьма полезна для определения более конкретных бизнес-правил и более сложных триггеров. Например, наш детектор пробок может отправлять ответы каждые 5 секунд в течение последних 10 минут и создавать оповещения, когда средняя скорость автомобиля падает ниже 20 км/ч.

Это был краткий обзор основных концепций структурированной потоковой передачи Spark и того, как их можно применять с помощью Kafka.

Apache Kafka и Apache Spark — это надежные и надежные инструменты, используемые многими компаниями для ежедневной обработки невероятных объемов данных, что делает их одной из самых надежных пар в задаче потоковой обработки.

Мы научились заполнять, использовать и обрабатывать темы Kafka с помощью заданий Spark. Это была несложная задача, как упоминалось в посте, API потоковой обработки практически равен обычному пакетному API, с небольшими изменениями.

Мы также обсудили различные режимы вывода, некоторые особенности потоковых приложений и способы использования каждого из них. И последнее, но не менее важное: мы исследовали агрегации с временными окнами — одну из основных возможностей потоковой обработки.

Опять же, это был всего лишь беглый взгляд, и я оставлю несколько ссылок ниже, если вы хотите изучить глубже.

Надеюсь, я чем-то помог, спасибо, что дочитали! 🙂

Весь код доступен здесь Репозиторий GitHub.
Используемые данные —
Контагены Volumétricas de Radares, Открыть данные, губернатор Бразилии

[1] Подробное описание функции: водяные знаки в структурированной потоковой передаче Apache Spark — Макс Фишер в блоге Databricks
[2] Чемберс Б. и Захария М. (2018). Spark: Полное руководство: обработка больших данных стала проще. «О'Рейли Медиа, Инк.».
[3] Логистика, доставка и транспортировка в реальном времени с помощью Apache Kafka— Кай Ванер
[4] Показывая Apache Kafka в Netflix Studio и Finance World — Слитный блог
[5] Spark Streaming и Kafka — https://sparkbyexamples.com/

Быстрый взгляд на структурированную потоковую передачу Spark + Kafka, повторно опубликованную из источника https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 через https:/ /towardsdatascience.com/feed

<!–

->

Отметка времени:

Больше от Блокчейн-консультанты