O privire rapidă asupra fluxului Spark Structured + Kafka

Învățați elementele de bază despre cum să utilizați acest duo puternic pentru sarcini de procesare a fluxului

Fotografie de Nikhita Singhal on Unsplash

Recent am început să studiez multe despre Apache Kafka și Apache Spark, două tehnologii de vârf în lumea ingineriei datelor.

Am realizat mai multe proiecte folosindu-le în ultimele luni; „Învățare automată în flux cu Kafka, Debezium și BentoML” este un exemplu. Accentul meu este să învăț cum să creez conducte de date puternice cu aceste instrumente moderne faimoase și să îmi fac o idee despre avantajele și dezavantajele lor.

În ultimele luni, am vorbit deja despre cum să creez conducte ETL folosind ambele instrumente, dar niciodată folosindu-le împreună, și acesta este golul pe care îl voi umple astăzi.

Scopul nostru este să aflăm ideea generală din spatele creării unei aplicații de streaming cu Spark+Kafka și să aruncăm o privire rapidă asupra conceptelor sale principale folosind date reale.

Ideea este simplă — Apache Kafka este un instrument de transmitere a mesajelor, în care producătorii scriu mesaje la un capăt al unei cozi (numită subiect) pentru a fi citite de consumatori pe de altă parte.

Dar este un instrument foarte complex, construit pentru a fi un serviciu de mesagerie distribuit rezistent, cu tot felul de garanții de livrare (exact o dată, o dată, oricare), stocarea mesajelor și replicarea mesajelor, permițând totodată flexibilitate, scalabilitate și un randament ridicat. Are un set mai larg de cazuri de utilizare, cum ar fi comunicarea cu microservicii, sistemele de evenimente în timp real și conductele ETL de streaming.

Apache Spark este un motor de transformare a datelor bazat pe memorie distribuită.

Este, de asemenea, un instrument foarte complex, capabil să se conecteze cu tot felul de baze de date, sisteme de fișiere și infrastructură cloud. Este proiectat să funcționeze în medii distribuite pentru a paraleliza procesarea între mașini, realizând transformări de înaltă performanță prin utilizarea filozofiei sale de evaluare leneșă și a optimizărilor de interogare.

Partea interesantă este că, până la sfârșitul zilei, codul este doar interogarea ta SQL obișnuită sau (aproape) scriptul tău Python+pandas, cu toată vrăjitoria abstractă sub un API de nivel înalt, ușor de utilizat.

Alăturați-vă acestor două tehnologii și avem o potrivire perfectă pentru a construi o conductă ETL de streaming.

Vom folosi datele de la senzorii de trafic din orașul Belo Horizonte (BH), capitala Minas Gerais (Brazilia). Este un set uriaș de date care conține măsurători ale fluxului de trafic în mai multe locuri din oraș. Fiecare senzor detectează periodic tipul de vehicul care circulă în acea locație (mașină, motocicletă, autobuz/camion), viteza și lungimea acestuia (și alte informații pe care nu le vom folosi).

Acest set de date reprezintă tocmai una dintre aplicațiile clasice pentru sistemele de streaming - un grup de senzori care își trimit continuu citirile din teren.

În acest scenariu, Apache Kafka poate fi folosit ca un strat de abstractizare între senzori și aplicațiile care le consumă datele.

Kafka folosit ca strat de abstractizare între surse și servicii. Imagine de autor.

Cu acest tip de infrastructură, este posibil să construiți tot felul de (așa-numitele) sisteme în timp real bazate pe evenimente, ca un program de detectare și alertă pentru blocajele de trafic atunci când numărul de vehicule crește brusc odată cu scăderea vitezei medii.

Și aici intervine Apache Spark.

Are un modul nativ pentru procesarea fluxului numit Streaming structurat Spark, care se poate conecta la Kafka și procesa mesajele acestuia.

Configurarea mediului

Tot ce aveți nevoie este docker și docker-compose.

Vom folosi o configurație de fișier docker-compose bazată pe următoarele depozite: scânteie de legătură, link kafka.

./src volumul este locul în care vom pune scripturile.

Pentru a porni mediul, doar alergați

docker-compune-te

Tot codul este disponibil în aceasta GitHub depozit.

Unul dintre lucrurile care mi-au plăcut cel mai mult când am început să studiez Spark a fost asemănarea dintre codul scris pentru acesta și scripturile mele obișnuite python+pandas. A fost foarte ușor de migrat.

Urmând aceeași logică, modulul de streaming al lui Spark este foarte asemănător cu codul spark obișnuit, facilitând migrarea de la aplicațiile batch la cele de flux.

Acestea fiind spuse, în secțiunile următoare, ne vom concentra pe învățarea specificului streamingului structurat Spark, adică ce caracteristici noi are.

Primul nostru loc de muncă

Să începem încet și să construim un exemplu de jucărie

Primul lucru pe care trebuie să-l faceți este să creați un subiect Kafka de unde jobul nostru va consuma mesajele.

Acest lucru este realizat de accesând terminalul de containere Kafka și executând:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Pentru a simula un producător care scrie mesaje pe acest subiect, să folosim kafka-consola-producator. De asemenea, în interiorul recipientului:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

De acum, fiecare linie introdusă în terminal va fi trimisă ca mesaj către subiectul de testare. Caracterul „:” este folosit pentru a separa cheia și valoarea mesajului (cheie:valoare).

Să creăm un job Spark pentru a folosi acest subiect.

Codul trebuie introdus în interiorul /src/streaming folder (nimic special, doar folderul pe care l-am ales).

Principalul lucru de reținut este că folosim atributele readStream și scrie flux, în loc de citire și scriere obișnuită. Acesta este principalul aspect care face ca Spark să ne trateze meseria ca pe o aplicație de streaming.

Pentru a vă conecta la Kafka, este necesar să specificați serverul și subiectul. Optiunea startingOffsets="cel mai devreme” îi spune Spark să citească subiectul de la început. De asemenea, pentru că Kafka își stochează mesajele în binar forma, acestea trebuie decodate şir.

Celelalte opțiuni vor fi explorate în continuare.

Acum, să accesăm containerul Spark și să rulăm lucrarea.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

După câteva secunde de configurare, va începe să consume subiectul.

Mesaje consumatoare de scântei de la Kafka. Imagine de autor.

Spark Streaming funcționează în micro-loturi modul, și de aceea vedem informațiile „lot” atunci când consumă mesajele.

Micro-loturi este oarecum între streaming complet „adevărat”, în care toate mesajele sunt procesate individual pe măsură ce sosesc și lotul obișnuit, în care datele rămân statice și sunt consumate la cerere. Spark va aștepta ceva timp încercând să acumuleze mesaje pentru a le procesa împreună, reducând cheltuielile generale și crescând latența. Acesta poate fi adaptat nevoilor dvs.

Nu sunt un tipar foarte rapid, așa că Spark procesează mesajul înainte de a putea include altele noi în lotul curent.

Și acesta a fost primul nostru job de streaming!

Sper că aveți senzația: nu este greu să codificați o lucrare de procesare a fluxului, dar există câteva probleme.

Scrierea datelor într-un flux Kafka

Acum este timpul să începeți să vă jucați cu datele senzorului.

Puteți descărca zip fișier din AUGUST 2022 și extrageți-l în fișierul /date volum. Datele sunt inițial în JSON și ocupă aproximativ 23 Gb de spațiu. Primul lucru de făcut este să îl convertești în parchet pentru a optimiza spațiul pe disc și timpul de citire.

Lucrările spark pentru a face acest lucru sunt detaliate în depozitul GitHub, tot ce trebuie să faceți este să le executați:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

În funcție de mașina dvs., execuția poate dura ceva timp. Dar se plătește, dimensiunea finală a fișierului cu parchet este de ~1 Gb (mai mult de 20 de ori mai mică) și mult mai rapid de citit.

De asemenea, trebuie să creăm subiectul Kafka pentru a ne primi mesajele:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Opțional, dacă doriți să afișați mesajele care sosesc, este posibil să configurați un consumator de consolă.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Scrierea datelor pe un subiect Kafka este ușor, dar are câteva detalii.

În fluxul structurat, comportamentul implicit este de a nu încerca să deducem schema de date (coloanele și tipurile acestora), așa că trebuie să trecem una.

Mesajele Kafka sunt doar perechi de șiruri binare cheie-valoare, așa că trebuie să ne reprezentăm datele în acest format. Acest lucru poate fi realizat cu ușurință prin conversia tuturor rândurilor în șiruri JSON, codificarea lor în binar și stocarea rezultatului în coloana „valoare”.

Transformarea coloanelor în șiruri JSON. Imagine de autor.

Cheile pentru mesaje sunt foarte importante în Kafka, dar nu vor fi utile în testele noastre, așa că toate mesajele vor avea la fel.

După cum am menționat anterior, acest set de date este URIAȘ, așa că am limitat numărul de mesaje inserate la 500,000.

În cele din urmă, trecem de serverul și subiectul Kafka și un „punct de controlLocație” unde scânteia va stoca progresul execuției, util pentru a recupera din erori.

Executarea sarcinii:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Inserarea datelor în Kafka. Imagine de autor.

În stânga, jobul Spark citește fișierul, în dreapta, a kafka-consola-consumator afișează mesajele care sosesc.

Subiectul nostru de trafic este populat și aproape gata pentru a fi procesat.

Este important să ne amintim că am folosit un job de scânteie pentru a ne popula subiectul doar în scopuri de învățare. Într-un scenariu real, senzorii înșiși vor trimite citiri direct către Kafka.

Pentru a simula acest comportament dinamic, scriptul de mai jos scrie 1 rând în subiect la fiecare 2.5 secunde.

Moduri de ieșire — Numărarea numărului de vehicule după tip

Mergând mai departe, să creăm un loc de muncă pentru a număra numărul de vehicule după tip.

Coloana „Clasificare” (Clasificare) conține tipul de vehicul detectat.

Pe măsură ce citim din subiect, trebuie să convertim șirurile binare JSON înapoi în formatul de coloană.

Odată făcut acest lucru, interogarea poate fi construită ca de obicei. Este interesant de observat că inima de interogare este doar selecta,a se grupa cu,conta(), restul este relativ la logica de streaming.

Așa că este timpul să abordăm outputMode() opțiune.

Modul de ieșire al unei aplicații de flux specifică modul în care dorim să (re)calculăm și să scriem rezultatele pe măsură ce sosesc date noi.

Poate lua trei valori diferite:

  • Adăuga: adăugați numai înregistrări noi la ieșire.
  • Completa: Recalculați rezultatul complet pentru fiecare înregistrare nouă.
  • Actualizează: Actualizați înregistrările modificate.

Aceste moduri pot sau nu au sens în funcție de aplicația scrisă. De exemplu, modul „complet” poate să nu aibă sens dacă se efectuează vreo grupare sau sortare.

Să executăm lucrarea în modul „complet” și să ne uităm la rezultate.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Camion, Automóvel-Mașină, Indefinit-Nedefinit, Ônibus-Autobuz, Moto-Motocicletă. Imagine de autor.

Pe măsură ce noi înregistrări sunt introduse în flux (vezi terminalul din dreapta), jobul recalculează rezultatul complet. Acest lucru poate fi util în situațiile în care ordonarea rândurilor este importantă, cum ar fi clasarea sau competiția.

Cu toate acestea, această abordare poate să nu fie optimă dacă numărul de grupuri este prea mare sau dacă modificările individuale nu afectează rezultatul general.

Deci, o altă opțiune este utilizarea modului de ieșire „actualizare”, care generează un mesaj nou numai pentru grupurile care s-au schimbat. Vezi mai jos:

Interogarea cu modul de ieșire „actualizare”. Imagine de autor.

Modul „adăugați” nu este disponibil pentru interogările cu grupare, așa că nu voi putea afișa folosind aceeași sarcină. Dar cred că este cel mai simplu mod, acesta mereu adaugă o înregistrare nouă la ieșire.

Aceste moduri de ieșire sunt mai ușor de înțeles dacă vă gândiți la salvarea rezultatelor într-un tabel. În modul de ieșire completă, tabelul va fi rescris pentru fiecare mesaj nou procesat, în modul de actualizare, doar liniile în care a avut loc o actualizare, iar anexarea va adăuga întotdeauna o nouă linie la sfârșit.

Tumbling time window — Agregarea folosind intervale de timp

În sistemele de streaming, mesajele au două marcaje temporale diferite legate de ele: Ora evenimentului — Ora la care a fost creat mesajul, în cazul nostru, timpul de citire al senzorului și Timpul procesării — Când mesajul este citit de agentul de procesare, în cazul nostru când ajunge la Spark.

O caracteristică importantă a instrumentelor de procesare a fluxului este capacitatea de a gestiona procesarea timpului evenimentului. Ferestrele care se prăbușesc sunt intervale de timp fixe care nu se suprapun, utilizate pentru a face agregari folosind coloane de timp eveniment. Pentru a spune mai simplu, ei departează cronologia în felii de dimensiuni egale, astfel încât fiecare eveniment să aparțină unui singur interval.

De exemplu, numărați, la fiecare 5 minute, câte vehicule au fost detectate în ultimele 5 minute.

Fereastră de 5 min. Imagine de autor.

Codul de mai jos ilustrează acest lucru:

Acest tip de prelucrare poate fi extrem de util în multe situații. Revenind la detectorul de ambuteiaj propus mai devreme, o abordare posibilă este măsurarea vitezei medii a vehiculelor într-o fereastră de 10 minute și a vedea dacă este sub un anumit prag.

Procesarea în timpul evenimentului este un subiect complex. Totul se poate întâmpla atunci când ai de-a face cu el, cum ar fi mesajele care se pierd, sosesc prea târziu sau ies din ordine. Spark are mai multe mecanisme pentru a încerca să atenueze problemele, cum ar fi filigrane, pe care nu ne vom concentra.

Ferestrele de timp pot fi, de asemenea, utilizate împreună cu alte coloane din a se grupa cu(). Exemplul de mai jos numără numărul de vehicule după tip într-o fereastră de 5 minute.

Fereastra de timp alunecată — Flexibilizare pe intervalele de timp

Ferestrele de timp glisante sunt o flexibilizare a ferestrelor care se rotesc. În loc să creeze intervale care nu se suprapun, acestea permit definirea cât de des va fi creat fiecare interval.

De exemplu, la fiecare 5 minute, numărați câte vehicule au fost detectate în ultimele 30 de minute.

Din acest motiv, evenimentele pot aparține mai multor intervale și pot fi numărate de câte ori este nevoie.

Pentru a defini o fereastră glisantă, treceți intervalul de actualizare la fereastră() funcția.

Să vedem rezultatul.

După cum putem vedea, avem ferestre de 30 de minute create la fiecare 5 minute.

Această flexibilitate poate fi destul de utilă pentru a defini reguli de afaceri mai specifice și declanșatoare mai complexe. De exemplu, detectorul nostru de blocaj în trafic poate trimite răspunsuri la fiecare 5 secunde în ultimele 10 minute și poate crea o alertă atunci când viteza medie a mașinii scade sub 20 km/h.

Aceasta a fost o privire rapidă asupra conceptelor principale ale Spark Structured Streaming și cum pot fi aplicate cu Kafka.

Apache Kafka și Apache Spark sunt atât instrumente fiabile, cât și robuste utilizate de multe companii pentru a procesa zilnic cantități incredibile de date, făcându-le una dintre cele mai puternice perechi în sarcina de procesare a fluxului.

Am învățat cum să populam, să consumăm și să procesăm subiectele Kafka folosind joburi Spark. Aceasta nu a fost o sarcină grea, așa cum s-a menționat în postare, API-ul de procesare a fluxului este aproape egal cu API-ul batch obișnuit, cu doar câteva ajustări minore.

Am discutat, de asemenea, despre diferite moduri de ieșire, ceva specific aplicațiilor de flux și modul în care fiecare poate fi utilizat. Nu în ultimul rând, am explorat agregările cu ferestre de timp, una dintre principalele capabilități ale procesării fluxului.

Din nou, aceasta a fost o privire rapidă și voi lăsa câteva referințe mai jos dacă doriți să explorați mai profund.

Sper că te-am ajutat cumva, mulțumesc că ai citit! 🙂

Tot codul este disponibil în aceasta GitHub depozit.
Date utilizate -
Contagens Volumétricas de Radares, Deschideți date, guvernatorul brazilian.

[1] Caracteristică Deep Dive: Watermarking în Apache Spark Structured Streaming — Max Fisher pe blogul Databricks
[2] Chambers, B., & Zaharia, M. (2018). Spark: Ghidul definitiv: procesarea datelor mari simplificată. „O'Reilly Media, Inc.”.
[3] Logistică, expediere și transport în timp real cu Apache Kafka— Kai Waehner
[4] Cu Apache Kafka în Netflix Studio și în lumea finanțelor — Blog confluent
[5] Spark Streaming și Kafka — https://sparkbyexamples.com/

O privire rapidă asupra streamingului structurat Spark + Kafka republicat din sursă https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 prin https:/ /towardsdatascience.com/feed

<!–

->

Timestamp-ul:

Mai mult de la Consultanți Blockchain