Szybkie spojrzenie na Strumieniowanie Strukturyzowane Spark + Kafka

Nauka podstaw korzystania z tego potężnego duetu do zadań związanych z przetwarzaniem strumieniowym

Photo by Nikhita Singhal on Unsplash

Ostatnio zacząłem dużo studiować na temat Apache Kafka i Apache Spark, dwóch wiodących technologii w świecie inżynierii danych.

W ciągu ostatnich kilku miesięcy wykonałem z ich wykorzystaniem kilka projektów; „Przesyłanie strumieniowe uczenia maszynowego za pomocą Kafki, Debezium i BentoML” to przykład. Skupiam się na nauce tworzenia potężnych potoków danych za pomocą tych nowoczesnych, słynnych narzędzi i poznaniu ich zalet i wad.

W ostatnich miesiącach omawiałem już, jak tworzyć potoki ETL przy użyciu obu narzędzi, ale nigdy nie używając ich jednocześnie, i tę lukę dzisiaj wypełnię.

Naszym celem jest poznanie ogólnej idei tworzenia aplikacji do przesyłania strumieniowego za pomocą Spark+Kafka i szybkie spojrzenie na jej główne koncepcje przy użyciu rzeczywistych danych.

Pomysł jest prosty — Apache Kafka to narzędzie do strumieniowego przesyłania wiadomości, w którym producenci zapisują wiadomości na jednym końcu kolejki (tzw. aktualny) do odczytania przez konsumentów z drugiej strony.

Jest to jednak bardzo złożone narzędzie, zbudowane jako odporna usługa przesyłania wiadomości rozproszonych, oferująca wszelkiego rodzaju gwarancje dostarczenia (dokładnie raz, raz, dowolne), przechowywanie wiadomości i replikację wiadomości, a jednocześnie zapewniającą elastyczność, skalowalność i wysoką przepustowość. Ma szerszy zestaw przypadków użycia, takich jak komunikacja mikrousług, systemy zdarzeń w czasie rzeczywistym i strumieniowe potoki ETL.

Apache Spark to silnik transformacji danych oparty na pamięci rozproszonej.

Jest to również bardzo złożone narzędzie, które może łączyć się z wszelkiego rodzaju bazami danych, systemami plików i infrastrukturą chmurową. Jest przystosowany do działania w środowiskach rozproszonych w celu równoległego przetwarzania między maszynami, osiągając transformacje o wysokiej wydajności dzięki zastosowaniu filozofii leniwej oceny i optymalizacji zapytań.

Najfajniejsze jest to, że ostatecznie kod jest zwykłym zapytaniem SQL lub (prawie) skryptem Python+pandas, z całą magią abstrakcyjną w ramach ładnego, przyjaznego dla użytkownika interfejsu API wysokiego poziomu.

Połącz te dwie technologie, a będziemy mieli idealne dopasowanie do zbudowania strumieniowego potoku ETL.

Wykorzystamy dane z czujników ruchu w mieście Belo Horizonte (BH), stolicy Minas Gerais (Brazylia). To ogromny zbiór danych zawierający pomiary natężenia ruchu w kilku miejscach miasta. Każdy czujnik okresowo wykrywa rodzaj pojazdu poruszającego się w danym miejscu (samochód, motocykl, autobus/ciężarówka), jego prędkość i długość (oraz inne informacje, których nie będziemy wykorzystywać).

Ten zbiór danych reprezentuje dokładnie jedno z klasycznych zastosowań systemów przesyłania strumieniowego — grupę czujników wysyłających w sposób ciągły swoje odczyty z terenu.

W tym scenariuszu Apache Kafka może służyć jako warstwa abstrakcji pomiędzy czujnikami a aplikacjami zużywającymi ich dane.

Kafka używana jako warstwa abstrakcji pomiędzy źródłami i usługami. Zdjęcie autorstwa autora.

Przy takiej infrastrukturze możliwa jest budowa wszelkiego rodzaju (tzw.) systemy sterowane zdarzeniami w czasie rzeczywistym, jak program do wykrywania korków i ostrzegania o nich, gdy liczba pojazdów nagle wzrasta wraz ze spadkiem średniej prędkości.

I tu właśnie pojawia się Apache Spark.

Posiada natywny moduł do przetwarzania strumieni o nazwie Strumieniowanie strukturalne Spark, które mogą łączyć się z platformą Kafka i przetwarzać jej wiadomości.

Przygotowanie środowiska

Wszystko czego potrzebujesz to okno dokowane i funkcja docker-compose.

Użyjemy konfiguracji pliku docker-compose w oparciu o następujące repozytoria: iskra łącza, link do kafki.

Połączenia ./źr wolumen to miejsce, w którym będziemy umieszczać nasze skrypty.

Aby uruchomić środowisko wystarczy uruchomić

dokuj-skomponuj

Cały kod jest dostępny w this Repozytorium GitHub.

Jedną z rzeczy, która najbardziej mi się spodobała, gdy zaczynałem studiować Sparka, było podobieństwo pomiędzy napisanym dla niego kodem a moimi zwykłymi skryptami Pythona i Pandy. Migracja była bardzo łatwa.

Kierując się tą samą logiką, moduł przesyłania strumieniowego Spark jest bardzo podobny do zwykłego kodu Spark, co ułatwia migrację z aplikacji wsadowych do aplikacji strumieniowych.

Powiedziawszy to, w kolejnych sekcjach skupimy się na poznaniu specyfiki strukturalnego przesyłania strumieniowego Spark, tj. jakie nowe funkcje ma.

Nasza pierwsza praca

Zacznijmy powoli i zbudujmy przykładową zabawkę

Pierwszą rzeczą do zrobienia jest utworzenie tematu Kafki, z którego nasze zadanie Spark będzie pobierać wiadomości.

Odbywa się to przez dojazd do terminalu kontenerowego Kafka i wykonanie:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Aby zasymulować producenta piszącego wiadomości na ten temat, użyjmy metody producent-konsoli kafka. Również w pojemniku:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Od tej chwili każda linia wpisana w terminalu będzie wysyłana jako wiadomość do tematu testowego. Znak „:” oddziela klucz i wartość wiadomości (klucz:wartość).

Utwórzmy zadanie platformy Spark, aby wykorzystać ten temat.

Kod należy umieścić w środku /src/streaming folder (nic specjalnego, tylko folder, który wybrałem).

Najważniejszą rzeczą, na którą należy zwrócić uwagę, jest to, że używamy atrybutów czytaj Strumień i napisz strumień, zamiast normalnego odczytu i zapisu. To główny aspekt, który sprawia, że ​​Spark traktuje naszą pracę jako aplikację do przesyłania strumieniowego.

Aby połączyć się z Kafką konieczne jest określenie serwera i tematu. Opcja początkowePrzesunięcia=“najwcześniej” każe Sparkowi przeczytać temat od początku. Ponadto, ponieważ Kafka przechowuje swoje wiadomości w dwójkowy formie, do której należy je rozszyfrować ciąg.

Pozostałe opcje zostaną dokładniej zbadane.

Przejdźmy teraz do kontenera Spark i uruchommy zadanie.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Po kilku sekundach konfiguracji zacznie pochłaniać temat.

Iskra pochłania wiadomości od Kafki. Zdjęcie autorstwa autora.

Spark Streaming działa w mikrodozowanie trybie i dlatego widzimy informacje „wsadowe”, gdy zużywają one wiadomości.

Mikro-wsadowość to coś pomiędzy pełnym „prawdziwym” przesyłaniem strumieniowym, w którym wszystkie wiadomości są przetwarzane indywidualnie w momencie ich otrzymania, a zwykłą wsadą, w której dane pozostają statyczne i są wykorzystywane na żądanie. Spark będzie czekać jakiś czas, próbując zgromadzić komunikaty w celu przetworzenia ich razem, zmniejszając obciążenie i zwiększając opóźnienia. Można to dostosować do swoich potrzeb.

Nie umiem pisać zbyt szybko, więc Spark przetwarza wiadomość, zanim będę mógł dołączyć nowe do bieżącej partii.

I to była nasza pierwsza praca nad streamingiem!

Mam nadzieję, że masz wrażenie: kodowanie zadania przetwarzania strumienia nie jest trudne, ale są pewne problemy.

Zapisywanie danych w strumieniu Kafki

Teraz czas zacząć zabawę z danymi z czujnika.

Możesz pobrać zamek błyskawiczny z SIERPNIA 2022 r. i wyodrębnij go do pliku /dane tom. Dane są oryginalnie w formacie JSON i zajmują około 23 GB miejsca. Pierwszą rzeczą do zrobienia jest przekształcenie go w parkiet, aby zoptymalizować miejsce na dysku i czas czytania.

Zadania iskrowe służące do tego są szczegółowo opisane w repozytorium GitHub. Wszystko, co musisz zrobić, to je wykonać:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

W zależności od komputera wykonanie może zająć trochę czasu. Ale to się opłaca, ostateczny rozmiar pliku parkietu wynosi ~ 1 Gb (ponad 20 razy mniej) i jest znacznie szybszy do odczytania.

Musimy także utworzyć temat Kafki, aby otrzymywać nasze wiadomości:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic Traffic_sensor

Opcjonalnie, jeśli chcesz wyświetlać przychodzące wiadomości, możesz skonfigurować konsumenta konsoli.

kafka-console-consumer.sh --topic czujnik ruchu --bootstrap-server localhost:9092

Zapisywanie danych na temat Kafki jest łatwe, ale zawiera pewne szczegóły.

W przypadku strukturalnego przesyłania strumieniowego domyślnym zachowaniem jest niepróbowanie wnioskowania o schemacie danych (kolumnach i ich typach), więc musimy go przekazać.

Komunikaty Kafki to po prostu pary ciągów binarnych klucz-wartość, dlatego musimy reprezentować nasze dane w tym formacie. Można to łatwo osiągnąć, konwertując wszystkie wiersze na ciągi JSON, kodując je w formacie binarnym i zapisując wynik w kolumnie „wartość”.

Przekształcanie kolumn w ciągi JSON. Zdjęcie autorstwa autora.

Klucze wiadomości są bardzo ważne w Kafce, ale nie będą przydatne w naszych testach, więc wszystkie wiadomości będą miały takie same.

Jak wspomniałem wcześniej, ten zbiór danych jest OGROMNY, dlatego ograniczyłem liczbę wstawianych wiadomości do 500,000 XNUMX.

Na koniec przekazujemy serwer i temat Kafki oraz „punkt kontrolnyLokalizacja”, gdzie iskra będzie przechowywać postęp wykonania, przydatny do odzyskiwania po błędach.

Wykonanie zadania:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Wstawianie danych do Kafki. Zdjęcie autorstwa autora.

Po lewej stronie zadanie Spark odczytuje plik, po prawej a konsument-konsola kafka wyświetla przychodzące wiadomości.

Nasz temat ruchu jest wypełniony i prawie gotowy do przetworzenia.

Należy pamiętać, że użyliśmy zadania iskrowego do zapełnienia naszego tematu wyłącznie w celach edukacyjnych. W prawdziwym scenariuszu same czujniki będą wysyłać odczyty bezpośrednio do Kafki.

Aby zasymulować to dynamiczne zachowanie, poniższy skrypt zapisuje 1 wiersz do tematu co 2.5 sekundy.

Tryby wyjściowe — Zliczanie liczby pojazdów według typu

Idąc dalej, utwórzmy zadanie zliczania liczby pojazdów według typu.

Kolumna „Classificação” (Klasyfikacja) zawiera wykryty typ pojazdu.

Jak czytamy z tematu, musimy przekonwertować ciągi binarne JSON z powrotem do formatu kolumnowego.

Po wykonaniu tej czynności zapytanie można zbudować w zwykły sposób. Warto zauważyć, że serce zapytania to tylko wybierać().Grupuj według().liczyć() sekwencja, cała reszta zależy od logiki przesyłania strumieniowego.

Czas zatem zająć się tryb wyjściowy() opcja.

Tryb wyjściowy aplikacji strumieniowej określa, w jaki sposób chcemy (ponownie) obliczyć i zapisać wyniki po nadejściu nowych danych.

Może przyjmować trzy różne wartości:

  • Dodać: Dodaj tylko nowe rekordy do wyniku.
  • Absolutna: Oblicz ponownie pełny wynik dla każdego nowego rekordu.
  • Aktualizacja: aktualizacja zmienionych rekordów.

Tryby te mogą mieć sens lub nie, w zależności od napisanej aplikacji. Na przykład tryb „kompletny” może nie mieć sensu, jeśli wykonywane jest jakiekolwiek grupowanie lub sortowanie.

Wykonajmy zadanie w trybie „ukończonym” i spójrzmy na wyniki.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — ciężarówka, automóvel-samochód, nieokreślony-nieokreślony, Ônibus-Bus, motocykl-motocykl. Zdjęcie autorstwa autora.

W miarę wstawiania nowych rekordów do strumienia (patrz terminal po prawej stronie) zadanie ponownie oblicza pełny wynik. Może to być przydatne w sytuacjach, w których ważna jest kolejność wierszy, np. ranking lub konkurencja.

Jednak takie podejście może nie być optymalne, jeśli liczba grup jest zbyt duża lub poszczególne zmiany nie mają wpływu na ogólny wynik.

Zatem inną opcją jest użycie trybu wyjściowego „aktualizacja”, który generuje nową wiadomość tylko dla grup, które uległy zmianie. Zobacz poniżej:

Zapytanie z trybem wyjściowym „aktualizacja”. Zdjęcie autorstwa autora.

Tryb „dołączania” nie jest dostępny dla zapytań z grupowaniem, więc nie będę mógł wyświetlić przy użyciu tego samego zadania. Ale myślę, że jest to najprostszy tryb, to zawsze dodaje nowy rekord do wyniku.

Te tryby wyjściowe są łatwiejsze do zrozumienia, jeśli myślisz o zapisaniu wyników w tabeli. W trybie pełnego wyjścia tabela zostanie przepisana dla każdej nowej przetworzonej wiadomości, w trybie aktualizacji tylko te wiersze, w których nastąpiła aktualizacja, a dołączenie zawsze doda nową linię na końcu.

Okno czasowe spadania — agregowanie przy użyciu przedziałów czasowych

W systemach przesyłania strumieniowego wiadomości mają dwa różne znaczniki czasu: Czas zdarzenia — czas utworzenia wiadomości, w naszym przypadku czas odczytu czujnika, oraz Czas przetwarzania — kiedy wiadomość jest czytana przez agenta przetwarzającego, w naszym przypadku dociera do Sparka.

Ważną cechą narzędzi do przetwarzania strumieni jest możliwość obsługi przetwarzania czasu zdarzeń. Okna wirujące to nienakładające się stałe przedziały czasu używane do tworzenia agregacji przy użyciu kolumn czasu zdarzenia. Mówiąc prościej, dzielą oś czasu na kawałki o jednakowej wielkości, tak aby każde wydarzenie należało do jednego przedziału.

Na przykład co 5 minut policz, ile pojazdów wykryto w ciągu ostatnich 5 minut.

5-minutowe obracające się okno. Zdjęcie autorstwa autora.

Poniższy kod ilustruje to:

Tego rodzaju przetwarzanie może być niezwykle przydatne w wielu sytuacjach. Wracając do zaproponowanego wcześniej czujnika korków, jednym z możliwych podejść jest zmierzenie średniej prędkości pojazdów w ciągu 10 minut i sprawdzenie, czy nie przekracza ona określonego progu.

Przetwarzanie w czasie zdarzenia to złożony temat. Kiedy sobie z tym poradzisz, wszystko może się zdarzyć, na przykład zagubienie wiadomości, dotarcie za późno lub zepsucie się. Spark ma kilka mechanizmów, które próbują złagodzić problemy, np Znaki wodne, na którym nie będziemy się skupiać.

Okna czasowe mogą być również używane w połączeniu z innymi kolumnami w pliku Grupuj według(). Poniższy przykład zlicza liczbę pojazdów według typu w oknie 5-minutowym.

Ruchome okno czasowe — Uelastycznienie przedziałów czasowych

Przesuwające się okna czasowe są uelastycznieniem walących się okien. Zamiast tworzyć nienakładające się interwały, pozwalają określić, jak często każdy interwał będzie tworzony.

Na przykład co 5 minut zliczaj, ile pojazdów wykryto w ciągu ostatnich 30 minut.

Dzięki temu zdarzenia mogą należeć do wielu przedziałów i być zliczane tyle razy, ile potrzeba.

Aby zdefiniować przesuwane okno, po prostu przekaż interwał aktualizacji do okno() funkcja.

Zobaczmy wynik.

Jak widzimy, co 30 minut tworzone są 5-minutowe okna.

Ta elastyczność może być bardzo przydatna do definiowania bardziej szczegółowych reguł biznesowych i bardziej złożonych wyzwalaczy. Na przykład nasz czujnik korków może wysyłać odpowiedzi co 5 sekund przez ostatnie 10 minut i tworzyć alert, gdy średnia prędkość samochodu spadnie poniżej 20 km/h.

Było to szybkie spojrzenie na główne koncepcje Spark Structured Streaming i sposoby ich zastosowania w Kafce.

Apache Kafka i Apache Spark to niezawodne i solidne narzędzia używane przez wiele firm do codziennego przetwarzania niesamowitych ilości danych, co czyni je jedną z najsilniejszych par w zadaniu przetwarzania strumieniowego.

Nauczyliśmy się, jak wypełniać, wykorzystywać i przetwarzać tematy platformy Kafka przy użyciu zadań platformy Spark. Nie było to trudne zadanie, jak wspomniano w poście, interfejs API przetwarzania strumieniowego jest prawie taki sam jak zwykły interfejs API wsadowy, z niewielkimi zmianami.

Omówiliśmy także różne tryby wyjściowe, specyficzne dla aplikacji strumieniowych i sposoby wykorzystania każdego z nich. Na koniec zbadaliśmy agregację z oknami czasowymi, jedną z głównych możliwości przetwarzania strumienia.

Powtórzę raz jeszcze: było to po prostu szybkie spojrzenie i jeśli chcesz głębiej przyjrzeć się temu tematowi, poniżej zostawię kilka odniesień.

Mam nadzieję, że w jakiś sposób pomogłem, dziękuję za przeczytanie! 🙂

Cały kod jest dostępny w this Repozytorium GitHub.
Wykorzystane dane —
Contagens Volumétricas de Radares, Otwórz dane, brazylijski gubernator

[1] Funkcja Głębokie nurkowanie: znak wodny w strukturalnym przesyłaniu strumieniowym Apache Spark — Max Fisher na blogu Databricks
[2] Chambers, B. i Zaharia, M. (2018). Spark: Kompletny przewodnik: proste przetwarzanie dużych zbiorów danych. „O'Reilly Media, Inc.”.
[3] Logistyka, wysyłka i transport w czasie rzeczywistym dzięki Apache Kafka— Kai Waehner
[4] Zawiera Apache Kafka w studiu Netflix i świecie finansów — Konkretny blog
[5] Streaming Spark i Kafka — https://sparkbyexamples.com/

Szybkie spojrzenie na strukturalne przesyłanie strumieniowe Spark + Kafka ponownie opublikowane ze źródła https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 przez https:// /w kierunkudatascience.com/feed

<!–

->

Znak czasu:

Więcej z Konsultanci Blockchain