Et hurtigt kig på Spark Structured Streaming + Kafka

Lær det grundlæggende i, hvordan du bruger denne kraftfulde duo til stream-behandlingsopgaver

Photo by Nikhita Singhal on Unsplash

For nylig begyndte jeg at studere en masse om Apache Kafka og Apache Spark, to førende teknologier inden for dataingeniørverdenen.

Jeg har lavet flere projekter ved at bruge dem i de sidste par måneder; “Machine Learning Streaming med Kafka, Debezium og BentoML” er et eksempel. Mit fokus er at lære at skabe kraftfulde datapipelines med disse moderne berømte værktøjer og få en fornemmelse af deres fordele og ulemper.

I de sidste måneder har jeg allerede dækket, hvordan man opretter ETL-pipelines ved at bruge begge værktøjer, men aldrig bruge dem sammen, og det er det hul, jeg vil udfylde i dag.

Vores mål er at lære den generelle idé bag at bygge en streamingapplikation med Spark+Kafka og give et hurtigt kig på dets hovedkoncepter ved hjælp af rigtige data.

Ideen er enkel - Apache Kafka er et værktøj til streaming af beskeder, hvor producenter skriver beskeder i den ene ende af en kø (kaldet en emne) til at blive læst af forbrugerne på den anden side.

Men det er et meget komplekst værktøj, bygget til at være en robust distribueret meddelelsestjeneste med alle mulige leveringsgarantier (præcis én gang, én gang, hvilken som helst), beskedlagring og beskedreplikering, samtidig med at det tillader fleksibilitet, skalerbarhed og høj gennemstrømning. Det har et bredere sæt af use cases, såsom mikroservicekommunikation, hændelsessystemer i realtid og streaming af ETL-pipelines.

Apache Spark er en distribueret hukommelsesbaseret datatransformationsmotor.

Det er også et meget komplekst værktøj, der kan forbindes med alle slags databaser, filsystemer og cloud-infrastruktur. Den er gearet til at fungere i distribuerede miljøer for at parallelisere behandling mellem maskiner og opnå højtydende transformationer ved at bruge sin dovne evalueringsfilosofi og forespørgselsoptimeringer.

Den fede del ved det er, at koden ved udgangen af ​​dagen bare er din sædvanlige SQL-forespørgsel eller (næsten) dit Python+pandas-script, med al hekseri abstraheret under en dejlig brugervenlig API på højt niveau.

Slut dig til disse to teknologier, og vi har et perfekt match til at bygge en streaming ETL-pipeline.

Vi vil bruge data fra trafiksensorer i byen Belo Horizonte (BH), hovedstaden i Minas Gerais (Brasilien). Det er et enormt datasæt, der indeholder målinger af trafikafviklingen flere steder i byen. Hver sensor registrerer med jævne mellemrum typen af ​​køretøj, der kører på det pågældende sted (bil, motorcykel, bus/lastbil), dets hastighed og længde (og andre oplysninger, som vi ikke kommer til at bruge).

Dette datasæt repræsenterer netop en af ​​de klassiske applikationer til streamingsystemer - en gruppe af sensorer, der sender deres aflæsninger kontinuerligt fra marken.

I dette scenarie kan Apache Kafka bruges som et abstraktionslag mellem sensorerne og de applikationer, der forbruger deres data.

Kafka brugt som et abstraktionslag mellem kilder og tjenester. Billede af forfatter.

Med denne form for infrastruktur er det muligt at bygge alle mulige (de såkaldte) hændelsesdrevne systemer i realtid, som et program til at registrere og advare om trafikpropper, når antallet af køretøjer pludselig stiger med et fald i gennemsnitshastigheden.

Og det er her, Apache Spark kommer ind i billedet.

Det har et indbygget modul til strømbehandling kaldet Spark Structured Streaming, der kan oprette forbindelse til Kafka og behandle dets budskaber.

Opsætning af miljøet

Alt du behøver er docker og docker-compose.

Vi bruger en docker-compose-filkonfiguration baseret på følgende arkiver: link gnist, link kafka.

./src volumen er der, hvor vi lægger vores scripts.

For at starte miljøet skal du bare løbe

docker-compose up

Al koden er tilgængelig i denne GitHub repository.

En af de ting, jeg bedst kunne lide, da jeg begyndte at studere Spark, var ligheden mellem skrevet kode til det og mine sædvanlige python+pandas-scripts. Det var meget nemt at migrere.

Efter samme logik ligner Sparks streaming-modul meget den sædvanlige spark-kode, hvilket gør det nemt at migrere fra batch-applikationerne til stream-programmerne.

Når det er sagt, vil vi i de følgende afsnit fokusere på at lære specificiteterne ved Spark-struktureret streaming, dvs. hvilke nye funktioner den har.

Vores første job

Lad os starte langsomt og bygge et legetøjseksempel

Den første ting at gøre er at oprette et Kafka-emne, hvorfra vores gnistjob vil forbruge beskederne.

Dette gøres af adgang til Kafka containerterminal og udfører:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

For at simulere en producent, der skriver beskeder om dette emne, lad os bruge kafka-konsol-producer. Også inde i beholderen:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Fra nu af vil hver linje, der indtastes i terminalen, blive sendt som en besked til testemnet. Tegnet ":" bruges til at adskille beskedens nøgle og værdi (nøgle:værdi).

Lad os oprette et Spark-job for at bruge dette emne.

Koden skal sættes inde i /src/streaming mappe (ikke noget særligt, bare den mappe, jeg valgte).

Det vigtigste at bemærke er, at vi bruger attributterne læsestream , skriveStream, i stedet for normal læsning og skrivning. Dette er hovedaspektet, der gør, at Spark behandler vores job som en streamingapplikation.

For at oprette forbindelse til Kafka er det nødvendigt at angive serveren og emnet. Muligheden startOffsets=“tidligst” fortæller Spark at læse emnet fra begyndelsen. Også fordi Kafka gemmer sine budskaber i binær form, skal de afkodes til streng.

De andre muligheder vil blive undersøgt yderligere.

Lad os nu få adgang til Spark-beholderen og køre jobbet.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Efter et par sekunders konfiguration begynder den at forbruge emnet.

Gnistforbrugende beskeder fra Kafka. Billede af forfatter.

Spark Streaming fungerer i mikro-batching tilstand, og det er derfor, vi ser "batch"-informationen, når den bruger meddelelserne.

Micro-batching er noget mellem fuld "true" streaming, hvor alle beskeder behandles individuelt, efterhånden som de ankommer, og den sædvanlige batch, hvor dataene forbliver statiske og forbruges on-demand. Spark vil vente noget tid med at prøve at samle meddelelser for at behandle dem sammen, hvilket reducerer overhead og øger latens. Dette kan indstilles til dine behov.

Jeg er ikke en superhurtig typer, så Spark behandler beskeden, før jeg kan inkludere nye i den aktuelle batch.

Og det var vores første streamingjob!

Jeg håber, at du får følelsen af: det er ikke svært at kode et streambehandlingsjob, men der er nogle gotchas.

Skrivning af data til en Kafka-strøm

Nu er det tid til at begynde at lege med sensordataene.

Du kan hente den zip fil fra AUGUST 2022 og udpak den ind i /data bind. Dataene er oprindeligt i JSON og tager omkring 23 Gb plads. Den første ting at gøre er at konvertere den til parket for at optimere diskplads og læsetid.

Gnistjobbene for at gøre dette er detaljeret i GitHub-lageret, alt hvad du skal gøre er at udføre dem:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Afhængigt af din maskine kan udførelsen tage noget tid. Men det betaler sig, den endelige parketfilstørrelse er ~1Gb (mere end 20x mindre) og meget hurtigere at læse.

Vi skal også oprette Kafka-emnet for at modtage vores beskeder:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Hvis du ønsker at vise de ankommende beskeder, er det eventuelt muligt at oprette en konsolforbruger.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

At skrive data om et Kafka-emne er nemt, men har nogle detaljer.

I struktureret streaming er standardadfærden ikke at forsøge at udlede dataskemaet (kolonner og deres typer), så vi skal videregive et.

Kafka-meddelelser er blot nøgleværdi binære strengpar, så vi skal repræsentere vores data i dette format. Dette kan nemt opnås ved at konvertere alle rækker til JSON-strenge, kode dem binært og gemme resultatet i kolonnen "værdi".

Omdannelse af kolonner til JSON-strenge. Billede af forfatter.

Beskednøgler er meget vigtige i Kafka, men de vil ikke være nyttige i vores test, så alle meddelelser vil have det samme.

Som nævnt før er dette datasæt ENORMT, så jeg begrænsede antallet af indsatte beskeder til 500,000.

Til sidst passerer vi Kafka-serveren og emnet og en "kontrolpunktPlacering” hvor gnisten vil gemme eksekveringsfremskridtet, nyttigt at gendanne efter fejl.

Udførelse af jobbet:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Indsættelse af data i Kafka. Billede af forfatter.

Til venstre læser Spark-jobbet filen, til højre en kafka-konsol-forbruger viser de ankommende beskeder.

Vores trafikemne er udfyldt og næsten klar til at blive behandlet.

Det er vigtigt at huske, at vi brugte et gnistjob til at udfylde vores emne kun til læringsformål. I et rigtigt scenarie vil sensorerne selv sende aflæsninger direkte til Kafka.

For at simulere denne dynamiske adfærd, skriver scriptet nedenfor 1 række til emnet hvert 2.5 sekund.

Outputtilstande — Tæller antallet af køretøjer efter type

Lad os fortsætte med at skabe et job for at tælle antallet af køretøjer efter type.

Kolonnen "Classificação" (Klassificering) indeholder den registrerede køretøjstype.

Mens vi læser fra emnet, skal vi konvertere de binære JSON-strenge tilbage til det kolonneformede format.

Når dette er gjort, kan forespørgslen bygges som normalt. Det er interessant at bemærke, at forespørgselshjertet kun er Vælg().gruppeBy().tælle() sekvens, resten er i forhold til streaminglogik.

Så det er på tide at tage fat på outputtilstand() mulighed.

Outputtilstanden for en stream-applikation specificerer, hvordan vi vil (gen)beregne og skrive resultaterne, efterhånden som nye data ankommer.

Det kan antage tre forskellige værdier:

  • Tilføj: Tilføj kun nye poster til outputtet.
  • Komplet: Genberegn det fulde resultat for hver ny rekord.
  • Opdatering: Opdater ændrede poster.

Disse tilstande kan eller kan ikke give mening afhængigt af den skrevne ansøgning. For eksempel giver tilstanden "fuldstændig" muligvis ikke mening, hvis der udføres nogen gruppering eller sortering.

Lad os udføre jobbet i "fuldført" tilstand og se på resultaterne.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Lastbil, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Billede af forfatter.

Efterhånden som nye poster indsættes i strømmen (se terminalen til højre), genberegner jobbet det fulde resultat. Dette kan være nyttigt i situationer, hvor rækkefølgen er vigtig, såsom rangering eller konkurrence.

Denne tilgang er dog muligvis ikke optimal, hvis antallet af grupper er for stort, eller de enkelte ændringer ikke påvirker det samlede resultat.

Så en anden mulighed er at bruge outputtilstanden "opdatering", som kun genererer en ny besked for de grupper, der har ændret sig. Se nedenunder:

Forespørgslen med output-tilstand "opdatering". Billede af forfatter.

"Tilføj"-tilstanden er ikke tilgængelig for forespørgsler med gruppering, så jeg vil ikke være i stand til at vise ved at bruge det samme job. Men jeg tror, ​​at det er den enkleste tilstand, det altid tilføjer en ny rekord til outputtet.

Disse outputtilstande er nemmere at forstå, hvis du tænker på at gemme resultaterne i en tabel. I den komplette output-tilstand vil tabellen blive omskrevet for hver ny meddelelse, der behandles, i opdateringstilstanden, kun de linjer, hvor en opdatering fandt sted, og tilføjelsen vil altid tilføje en ny linje til slutningen.

Tumbling tidsvindue — Aggregering ved hjælp af tidsintervaller

I streamingsystemer har meddelelser to forskellige tidsstempler relateret til dem: Hændelsestidspunkt — Tidspunktet, hvor meddelelsen blev oprettet, i vores tilfælde sensorens læsetid, og Behandlingstid — Når meddelelsen læses af behandlingsagenten, i vores tilfælde når den når Spark.

Et vigtigt træk ved strømbehandlingsværktøjer er evnen til at håndtere hændelsestidsbehandling. Tumbling-vinduer er ikke-overlappende faste tidsintervaller, der bruges til at lave sammenlægninger ved hjælp af hændelsestidskolonner. For at sige det mere enkelt deler de tidslinjen i lige store udsnit, så hver begivenhed hører til et enkelt interval.

For eksempel, tæl, hvert 5. minut, hvor mange køretøjer der blev registreret i de sidste 5 minutter.

5 min væltevindue. Billede af forfatter.

Koden nedenfor illustrerer dette:

Denne form for behandling kan være yderst nyttig i mange situationer. Går vi tilbage til den tidligere foreslåede trafikpropdetektor, er en mulig tilgang at måle køretøjernes gennemsnitshastighed i et 10-minutters vindue og se, om den er under en vis tærskel.

Hændelsestidsbehandling er et komplekst emne. Alt kan ske, når man håndterer det, som at beskeder går tabt, kommer for sent eller går ude af drift. Spark har flere mekanismer til at forsøge at afbøde problemerne, f.eks vand varemærker, som vi ikke vil fokusere på.

Tidsvinduer kan også bruges sammen med andre kolonner i groupBy(). Eksemplet nedenfor tæller antallet af køretøjer efter type i et 5 min vindue.

Glidende tidsvindue — Fleksibilisering af tidsintervallerne

Skydetidsvinduer er en flexibilisering af tumlevinduer. I stedet for at oprette ikke-overlappende intervaller, tillader de at definere, hvor ofte hvert interval skal oprettes.

For eksempel skal du hvert 5. minut tælle, hvor mange køretøjer der blev registreret i de sidste 30 minutter.

På grund af det kan begivenheder tilhøre mange intervaller og tælles så mange gange som nødvendigt.

For at definere et glidende vindue skal du blot overføre opdateringsintervallet til vindue()fungere.

Lad os se outputtet.

Som vi kan se, har vi 30 min vinduer, der bliver oprettet hver 5 min.

Denne fleksibilitet kan være ganske nyttig til at definere mere specifikke forretningsregler og mere komplekse udløsere. For eksempel kan vores trafikpropdetektor sende svar hvert 5. sekund omkring de seneste 10 minutter og oprette en advarsel, når den gennemsnitlige bilhastighed falder til under 20 km/t.

Dette var et hurtigt kig på hovedkoncepterne for Spark Structured Streaming, og hvordan de kan anvendes med Kafka.

Apache Kafka og Apache Spark er både pålidelige og robuste værktøjer, der bruges af mange virksomheder til dagligt at behandle utrolige mængder data, hvilket gør dem til et af de stærkeste par i strømbehandlingsopgaven.

Vi har lært, hvordan man udfylder, forbruger og behandler Kafka-emner ved hjælp af Spark-job. Dette var ingen svær opgave, som nævnt i indlægget, stream processing API er næsten lig med den sædvanlige batch API, med blot nogle mindre justeringer.

Vi har også diskuteret forskellige output-tilstande, noget specifikt for stream-applikationer, og hvordan hver enkelt kan bruges. Sidst, men ikke mindst, udforskede vi aggregeringer med tidsvinduer, en af ​​hovedfunktionerne ved strømbehandling.

Igen, dette var bare et hurtigt kig, og jeg vil efterlade nogle referencer nedenfor, hvis du vil udforske dybere.

Håber jeg har hjulpet på en eller anden måde, tak fordi du læste med! 🙂

Al koden er tilgængelig i denne GitHub repository.
Brugte data -
Contagens Volumétricas de Radares, Open data, brasiliansk guvernør.

[1] Feature Deep Dive: Vandmærke i Apache Spark Structured Streaming - Max Fisher på Databricks blog
[2] Chambers, B., & Zaharia, M. (2018). Spark: Den definitive guide: Big data-behandling gjort enkel. "O'Reilly Media, Inc.".
[3] Logistik, forsendelse og transport i realtid med Apache Kafka- Kai Waehner
[4] Med Apache Kafka i Netflix Studio og Finance World — Sammenflydende blog
[5] Spark Streaming & Kafka — https://sparkbyexamples.com/

Et hurtigt kig på Spark Structured Streaming + Kafka Genudgivet fra kilde https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 via https:/ /towardsdatascience.com/feed

<!–

->

Tidsstempel:

Mere fra Blockchain-konsulenter