Короткий огляд Spark Structured Streaming + Kafka

Вивчення основ використання цього потужного дуету для завдань потокової обробки

Фото Ніхіта Сінгхал on Unsplash

Нещодавно я почав багато вивчати Apache Kafka та Apache Spark, дві провідні технології у світі інженерії даних.

За останні кілька місяців я зробив кілька проектів, використовуючи їх; «Потокове передавання машинного навчання за допомогою Kafka, Debezium і BentoML” є прикладом. Я зосереджуся на тому, щоб навчитися створювати потужні конвеєри даних за допомогою цих сучасних відомих інструментів і отримати уявлення про їхні переваги та недоліки.

За останні місяці я вже розповідав, як створювати конвеєри ETL, використовуючи обидва інструменти, але ніколи не використовуючи їх разом, і це прогалина, яку я заповню сьогодні.

Наша мета — вивчити загальну ідею створення програми для потокового передавання за допомогою Spark+Kafka та швидко ознайомитися з її основними концепціями, використовуючи реальні дані.

Ідея проста — Apache Kafka — це інструмент потокової передачі повідомлень, де виробники пишуть повідомлення на одному кінці черги (називається тема) для читання споживачами з іншого.

Але це дуже складний інструмент, створений як стійка розподілена служба обміну повідомленнями, з усіма видами гарантій доставки (одноразово, одноразово, будь-коли), зберіганням повідомлень і реплікацією повідомлень, а також забезпечує гнучкість, масштабованість і високу пропускну здатність. Він має ширший набір варіантів використання, як-от зв’язок мікросервісів, системи подій у реальному часі та потокові конвеєри ETL.

Apache Spark — це механізм перетворення даних на основі розподіленої пам’яті.

Це також дуже складний інструмент, здатний підключатися до різноманітних баз даних, файлових систем і хмарної інфраструктури. Він призначений для роботи в розподілених середовищах, щоб розпаралелювати обробку між машинами, досягаючи високопродуктивних перетворень за допомогою філософії ледачого оцінювання та оптимізації запитів.

Цікава частина цього полягає в тому, що наприкінці дня код є просто вашим звичайним SQL-запитом або (майже) вашим сценарієм Python+pandas, з усім чаклунством, абстрагованим у приємному зручному API високого рівня.

Об’єднайте ці дві технології, і ми матимемо ідеальну пару для створення потокового конвеєра ETL.

Ми будемо використовувати дані датчиків руху в місті Белу-Орізонті (BH), столиці Мінас-Жерайс (Бразилія). Це величезний набір даних, що містить вимірювання транспортного потоку в кількох місцях міста. Кожен датчик періодично визначає тип транспортного засобу, що рухається в цьому місці (автомобіль, мотоцикл, автобус/вантажівка), його швидкість і довжину (та іншу інформацію, яку ми не збираємося використовувати).

Цей набір даних точно представляє одне з класичних застосувань для потокових систем — групи датчиків, які постійно надсилають свої показання з поля.

У цьому сценарії Apache Kafka можна використовувати як рівень абстракції між датчиками та програмами, які споживають їхні дані.

Kafka використовувався як шар абстракції між джерелами та службами. Зображення автора.

З такою інфраструктурою можна будувати всілякі (так звані) системи реального часу, керовані подіями, як програма для виявлення заторів і попередження про них, коли кількість транспортних засобів раптово збільшується зі зниженням середньої швидкості.

І тут у гру вступає Apache Spark.

Він має рідний модуль для потокової обробки під назвою Spark Structured Streaming, який може підключатися до Kafka та обробляти його повідомлення.

Налаштування середовища

Все, що вам потрібно, це docker і docker-compose.

Ми будемо використовувати конфігурацію файлу Docker-Compose на основі таких репозиторіїв: посилальна іскра, посилання kafka.

Команда ./src том, куди ми розмістимо наші сценарії.

Щоб запустити середовище, просто запустіть

докер-створити

Весь код доступний тут GitHub сховище.

Однією з речей, яка мені найбільше сподобалася, коли я почав вивчати Spark, була схожість між написаним кодом для нього та моїми звичайними сценаріями python+pandas. Це було дуже легко мігрувати.

Відповідно до тієї ж логіки, потоковий модуль Spark дуже схожий на звичайний код Spark, що полегшує міграцію з пакетних додатків до потокових.

Зважаючи на це, у наступних розділах ми зосередимося на вивченні особливостей структурованої потокової передачі Spark, тобто на тому, які нові функції вона має.

Наша перша робота

Давайте почнемо повільно і побудуємо приклад іграшки

Перше, що потрібно зробити, це створити тему Kafka, з якої наша іскриста робота буде споживати повідомлення.

Це робиться шляхом доступ до контейнерного терміналу Kafka і виконання:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Щоб імітувати виробника, який пише повідомлення на цю тему, скористаємося kafka-console-producer. Також всередині контейнера:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Відтепер кожен рядок, набраний у терміналі, надсилатиметься як повідомлення до тестової теми. Символ «:» використовується для розділення ключа повідомлення та значення (ключ:значення).

Давайте створимо роботу Spark, щоб використовувати цю тему.

Код потрібно розмістити всередині /src/streaming папка (нічого особливого, просто папка, яку я вибрав).

Головне, на що слід звернути увагу, це те, що ми використовуємо атрибути readStream та writeStream, замість звичайного читання та запису. Це головний аспект, який змушує Spark розглядати нашу роботу як потокову програму.

Для підключення до Kafka необхідно вказати сервер і тему. Варіант починаючизміщення=“earliest» повідомляє Spark прочитати тему з початку. Також тому, що Кафка зберігає свої повідомлення в двійковий форму, їх потрібно розшифрувати рядок.

Інші варіанти будуть додатково досліджені.

Тепер давайте отримаємо доступ до контейнера Spark і запустимо завдання.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Після кількох секунд конфігурації він почне споживати тему.

Іскропоглинаючі повідомлення від Кафки. Зображення автора.

Spark Streaming працює в мікродозування і тому ми бачимо «пакетну» інформацію, коли вона споживає повідомлення.

Мікропакетування є чимось між повним «справжнім» потоковим передаванням, коли всі повідомлення обробляються окремо, коли вони надходять, і звичайним пакетом, де дані залишаються статичними та споживаються за вимогою. Spark чекатиме деякий час, намагаючись накопичити повідомлення, щоб обробити їх разом, зменшуючи накладні витрати та збільшуючи затримку. Це можна налаштувати відповідно до ваших потреб.

Я не дуже швидко друкую, тому Spark обробляє повідомлення, перш ніж я можу включити нові в поточний пакет.

І це була наша перша трансляційна робота!

Сподіваюся, у вас з’явилося відчуття: написати завдання обробки потоку неважко, але є деякі недоліки.

Запис даних у потік Kafka

Тепер настав час почати грати з даними датчика.

Ви можете завантажити ZIP файл із СЕРПНЯ 2022 РОКУ та розпакуйте його в / data обсяг. Дані спочатку містяться у форматі JSON і займають близько 23 ГБ місця. Перше, що потрібно зробити, це перетворити його на паркет, щоб оптимізувати дисковий простір і час читання.

Завдання Spark для цього детально описано в репозиторії GitHub, все, що вам потрібно зробити, це виконати їх:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Залежно від вашої машини, виконання може зайняти деякий час. Але це окупається, кінцевий розмір файлу паркету становить ~1 Гб (більш ніж у 20 разів менший) і набагато швидший для читання.

Нам також потрібно створити тему Kafka, щоб отримувати наші повідомлення:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

За бажанням, якщо ви хочете відображати повідомлення, що надходять, можна налаштувати споживача консолі.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Написати дані на тему Кафки легко, але є деякі деталі.

У структурованій потоковій передачі поведінка за замовчуванням полягає в тому, щоб не намагатися вивести схему даних (стовпці та їхні типи), тому нам потрібно передати її.

Повідомлення Kafka — це лише пари двійкових рядків ключ-значення, тому нам потрібно представити наші дані в цьому форматі. Цього можна легко досягти, перетворивши всі рядки на рядки JSON, закодувавши їх у двійковий код і зберігши результат у стовпці «значення».

Перетворення стовпців у рядки JSON. Зображення автора.

Ключі повідомлень дуже важливі в Kafka, але вони не будуть корисними в наших тестах, тому всі повідомлення матимуть однакові значення.

Як згадувалося раніше, цей набір даних ВЕЛИЧЕЗНИЙ, тому я обмежив кількість вставлених повідомлень до 500,000 XNUMX.

Нарешті, ми передаємо сервер Kafka, тему та «checkpointLocation”, де spark зберігатиме прогрес виконання, корисний для відновлення після помилок.

Виконання завдання:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Вставлення даних у Kafka. Зображення автора.

Ліворуч завдання Spark читає файл, праворуч a kafka-console-consumer відображає прибулі повідомлення.

Наша тема трафіку заповнена та майже готова до обробки.

Важливо пам’ятати, що ми використовували spark job для заповнення нашої теми лише з метою навчання. У реальному сценарії датчики самі надсилатимуть показання безпосередньо Кафці.

Щоб імітувати цю динамічну поведінку, наведений нижче сценарій записує 1 рядок у тему кожні 2.5 секунди.

Режими виведення — Підрахунок кількості транспортних засобів за типом

Далі створимо завдання для підрахунку кількості транспортних засобів за типом.

Стовпець «Classificação» (Класифікація) містить виявлений тип автомобіля.

Оскільки ми читаємо з теми, нам потрібно перетворити двійкові рядки JSON назад у формат стовпців.

Коли це буде зроблено, запит можна створювати як зазвичай. Цікаво відзначити, що серце запиту є просто вибратиgroupByвважати(), все інше стосується потокової логіки.

Тож настав час зайнятися outputMode() варіант.

Режим виведення потокової програми визначає, як ми хочемо (повторно) обчислювати та записувати результати, коли надходять нові дані.

Він може приймати три різні значення:

  • Додавати: додавати лише нові записи до виводу.
  • Цілковита: Повторне обчислення повного результату для кожного нового запису.
  • Оновити: оновити змінені записи.

Ці режими можуть мати або не мати сенсу залежно від написаної програми. Наприклад, режим «повний» може не мати сенсу, якщо виконується будь-яке групування або сортування.

Давайте виконаємо роботу в режимі «завершено» і подивимося на результати.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Вантажівка, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Зображення автора.

Коли нові записи вставляються в потік (див. термінал праворуч), завдання повторно обчислює повний результат. Це може бути корисним у ситуаціях, коли порядок рядків важливий, наприклад рейтинг або змагання.

Однак цей підхід може бути не оптимальним, якщо кількість груп надто велика або окремі зміни не впливають на загальний результат.

Отже, іншим варіантом є використання режиму виведення «оновлення», який генерує нове повідомлення лише для груп, які змінилися. Дивись нижче:

Запит з режимом виведення «оновити». Зображення автора.

Режим «додавання» недоступний для запитів із групуванням, тому я не зможу показати використання того самого завдання. Але я думаю, що це найпростіший режим, це завжди додає новий запис до виводу.

Ці режими виведення простіше зрозуміти, якщо ви думаєте про збереження результатів у таблиці. У режимі повного виводу таблиця переписуватиметься для кожного нового обробленого повідомлення, у режимі оновлення – лише для рядків, де відбулося оновлення, а додавання завжди додаватиме новий рядок у кінець.

Часове вікно перекидання — агрегування за часовими інтервалами

У потокових системах повідомлення мають дві різні позначки часу, пов’язані з ними: час події — час створення повідомлення, у нашому випадку час зчитування датчика, і час обробки — коли повідомлення прочитано агентом обробки, у нашому випадку коли він досягає Spark.

Важливою особливістю інструментів потокової обробки є можливість обробки часу події. Вікна переміщення — це фіксовані інтервали часу, які не перекриваються, і використовуються для створення агрегацій за допомогою стовпців часу подій. Простіше кажучи, вони розрізають шкалу часу на фрагменти однакового розміру, щоб кожна подія належала до одного інтервалу.

Наприклад, підраховуйте кожні 5 хвилин, скільки транспортних засобів було виявлено за останні 5 хвилин.

5-хвилинне перекидаюче вікно. Зображення автора.

Наведений нижче код ілюструє це:

Такий вид обробки може бути надзвичайно корисним у багатьох ситуаціях. Повертаючись до запропонованого раніше детектора заторів, одним із можливих підходів є вимірювання середньої швидкості транспортних засобів за 10-хвилинне вікно та визначення того, чи є вона нижче певного порогу.

Подієва обробка є складною темою. Під час роботи з ним може статися будь-що, як-от повідомлення втрачено, надходить із запізненням або виходить із ладу. Spark має кілька механізмів, щоб спробувати пом’якшити проблеми, наприклад водяні знаки, на якому ми не будемо акцентувати увагу.

Часові вікна також можна використовувати разом з іншими стовпцями в groupBy(). Наведений нижче приклад підраховує кількість транспортних засобів за типом у 5-хвилинному вікні.

Ковзне часове вікно — гнучкість часових інтервалів

Ковзні часові вікна — це гнучкість перекидаючихся вікон. Замість створення інтервалів, що не перекриваються, вони дозволяють визначити, як часто буде створюватися кожен інтервал.

Наприклад, кожні 5 хвилин підраховуйте, скільки транспортних засобів було виявлено за останні 30 хвилин.

Через це події можуть належати багатьом інтервалам і рахуватися скільки завгодно разів.

Щоб визначити ковзне вікно, просто передайте інтервал оновлення в вікно() функція.

Подивимося на вихід.

Як ми бачимо, кожні 30 хвилин ми створюємо 5-хвилинні вікна.

Ця гнучкість може бути дуже корисною для визначення більш конкретних бізнес-правил і більш складних тригерів. Наприклад, наш детектор заторів може надсилати відповіді кожні 5 секунд за останні 10 хвилин і створювати сповіщення, коли середня швидкість автомобіля падає нижче 20 км/год.

Це був короткий огляд основних концепцій Spark Structured Streaming і того, як їх можна застосувати з Kafka.

Apache Kafka та Apache Spark є надійними та надійними інструментами, які використовуються багатьма компаніями для щоденної обробки неймовірних обсягів даних, що робить їх однією з найсильніших пар у завданні потокової обробки.

Ми навчилися заповнювати, споживати та обробляти теми Kafka за допомогою завдань Spark. Це не було важким завданням, як згадувалося в дописі, API потокової обробки майже дорівнює звичайному пакетному API, лише з деякими незначними коригуваннями.

Ми також обговорили різні режими виводу, щось специфічне для потокових програм, і як кожен з них можна використовувати. І останнє, але не менш важливе: ми досліджували агрегації з часовими вікнами, одну з основних можливостей потокової обробки.

Знову ж таки, це був лише швидкий погляд, і я залишу кілька посилань нижче, якщо ви хочете дослідити глибше.

Сподіваюся, я якось допоміг, дякую, що прочитали! 🙂

Весь код доступний тут GitHub сховище.
Використані дані —
Contagens Volumétricas de Radares, відкрити дані, губернатор Бразилії

[1] Функція Deep Dive: Водяні знаки в Apache Spark Structured Streaming — Макс Фішер у блозі Databricks
[2] Чемберс Б. та Захарія М. (2018). Spark: повне керівництво: обробка великих даних стала простою. “O'Reilly Media, Inc.”.
[3] Логістика, доставка та транспортування в режимі реального часу з Apache Kafka— Кай Вейнер
[4] З Apache Kafka у студії Netflix та у світі фінансів — Конфлюентний блог
[5] Spark Streaming & Kafka — https://sparkbyexamples.com/

Короткий огляд Spark Structured Streaming + Kafka Опубліковано з джерела https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 через https:/ /towardsdatascience.com/feed

<!–

->

Часова мітка:

Більше від Консультанти з блокчейнів