Een snelle blik op Spark Structured Streaming + Kafka

Leer de basisprincipes van het gebruik van dit krachtige duo voor streamverwerkingstaken

Foto door Nikhita Singhal on Unsplash

Onlangs ben ik veel gaan studeren over Apache Kafka en Apache Spark, twee toonaangevende technologieën in de data-engineeringwereld.

Ik heb er de afgelopen maanden verschillende projecten mee gemaakt; “Machine Learning-streaming met Kafka, Debezium en BentoML' is een voorbeeld. Mijn focus is om te leren hoe je krachtige datapijplijnen kunt creëren met deze moderne beroemde tools en een idee te krijgen van hun voor- en nadelen.

De afgelopen maanden heb ik al besproken hoe je ETL-pijplijnen kunt maken met behulp van beide tools, maar ze nooit samen kunt gebruiken, en dat is het gat dat ik vandaag ga opvullen.

Ons doel is om het algemene idee achter het bouwen van een streaming-applicatie met Spark+Kafka te leren kennen en een snelle blik te werpen op de belangrijkste concepten ervan met behulp van echte data.

Het idee is simpel: Apache Kafka is een tool voor het streamen van berichten, waarbij producenten berichten schrijven aan het ene uiteinde van een wachtrij (een zogenaamde wachtrij). onderwerp) die anderzijds door consumenten moeten worden gelezen.

Maar het is een zeer complexe tool, gebouwd als een veerkrachtige gedistribueerde berichtenservice, met allerlei leveringsgaranties (precies één keer, één keer, elke keer), berichtopslag en berichtreplicatie, terwijl het ook flexibiliteit, schaalbaarheid en hoge doorvoer mogelijk maakt. Het heeft een bredere reeks gebruiksscenario's, zoals microservicescommunicatie, realtime gebeurtenissystemen en streaming ETL-pijplijnen.

Apache Spark is een gedistribueerde, op geheugen gebaseerde datatransformatie-engine.

Het is ook een zeer complexe tool die verbinding kan maken met allerlei soorten databases, bestandssystemen en cloudinfrastructuur. Het is erop gericht om in gedistribueerde omgevingen te werken om de verwerking tussen machines te parallelliseren en hoogwaardige transformaties te realiseren door gebruik te maken van de luie evaluatiefilosofie en query-optimalisaties.

Het leuke ervan is dat de code uiteindelijk gewoon je gebruikelijke SQL-query of (bijna) je Python+panda-script is, waarbij alle hekserij is geabstraheerd onder een mooie gebruiksvriendelijke API op hoog niveau.

Sluit u aan bij deze twee technologieën en we hebben de perfecte match om een ​​streaming ETL-pijplijn te bouwen.

We gebruiken de gegevens van verkeerssensoren in de stad Belo Horizonte (BH), de hoofdstad van Minas Gerais (Brazilië). Het is een enorme dataset met metingen van de verkeersstroom op verschillende plekken in de stad. Elke sensor detecteert periodiek het type voertuig dat op die locatie rijdt (auto, motor, bus/vrachtwagen), de snelheid en lengte ervan (en andere informatie die we niet gaan gebruiken).

Deze dataset vertegenwoordigt precies een van de klassieke toepassingen voor streamingsystemen: een groep sensoren die hun metingen continu vanuit het veld verzenden.

In dit scenario kan Apache Kafka worden gebruikt als abstractielaag tussen de sensoren en de applicaties die hun gegevens verbruiken.

Kafka gebruikt als abstractielaag tussen bronnen en diensten. Afbeelding door auteur.

Met dit soort infrastructuur is het mogelijk om allerlei (de zogenaamde) realtime gebeurtenisgestuurde systemen, zoals een programma dat files detecteert en waarschuwt wanneer het aantal voertuigen plotseling toeneemt bij een daling van de gemiddelde snelheid.

En dat is waar Apache Spark in het spel komt.

Het heeft een native module voor streamverwerking genaamd Spark-gestructureerd streamen, die verbinding kan maken met Kafka en zijn berichten kan verwerken.

De omgeving opzetten

Het enige wat je nodig hebt is docker en docker-compose.

We gebruiken een docker-compose-bestandsconfiguratie op basis van de volgende opslagplaatsen: link vonk, link kafka.

De ./src volume is waar we onze scripts gaan plaatsen.

Om de omgeving te starten, hoeft u alleen maar te rennen

koppelaar-compose up

Hierin is alle code beschikbaar GitHub-repository.

Een van de dingen die ik het leukst vond toen ik Spark begon te bestuderen, was de gelijkenis tussen de geschreven code ervoor en mijn gebruikelijke python+pandas-scripts. Het was heel gemakkelijk om te migreren.

Volgens dezelfde logica lijkt de streamingmodule van Spark sterk op de gebruikelijke Spark-code, waardoor het gemakkelijk is om van de batch-applicaties naar de stream-applicaties te migreren.

Dat gezegd hebbende, zullen we ons in de volgende secties concentreren op het leren van de specifieke kenmerken van Spark-gestructureerde streaming, dat wil zeggen welke nieuwe functies het heeft.

Onze eerste klus

Laten we langzaam beginnen en een speelgoedvoorbeeld bouwen

Het eerste dat u moet doen, is een Kafka-onderwerp maken van waaruit onze sparkjob de berichten zal consumeren.

Dit wordt gedaan door toegang tot de containerterminal van Kafka en uitvoeren:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Om te simuleren dat een producer berichten over dit onderwerp schrijft, gebruiken we de kafka-console-producent. Ook in de container:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Vanaf nu wordt elke regel die in de terminal wordt getypt, als bericht naar het testonderwerp verzonden. Het teken “:” wordt gebruikt om de sleutel en de waarde van het bericht te scheiden (sleutel:waarde).

Laten we een Spark-taak maken om dit onderwerp te bespreken.

De code moet in het /src/streaming map (niets bijzonders, alleen de map die ik heb gekozen).

Het belangrijkste om op te merken is dat we de attributen gebruiken leesStream en schrijfStream, in plaats van normaal lezen en schrijven. Dit is het belangrijkste aspect dat ervoor zorgt dat Spark ons ​​werk als een streamingapplicatie beschouwt.

Om verbinding te maken met Kafka is het noodzakelijk om de server en het onderwerp op te geven. De optie startOffsets=“vroegste” vertelt Spark dat hij het onderwerp vanaf het begin moet lezen. Ook omdat Kafka zijn berichten opslaat in binair vorm, waarnaar ze gedecodeerd moeten worden snaar.

De andere opties zullen verder worden onderzocht.

Laten we nu toegang krijgen tot de Spark-container en de taak uitvoeren.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Na een paar seconden configuratie begint het onderwerp te worden gebruikt.

Vonk consumerende berichten van Kafka. Afbeelding door auteur.

Spark Streaming werkt microbatch modus, en daarom zien we de ‘batch’-informatie wanneer de berichten worden verbruikt.

Micro-batching houdt het midden tussen volledige ‘echte’ streaming, waarbij alle berichten individueel worden verwerkt zodra ze binnenkomen, en de gebruikelijke batch, waarbij de gegevens statisch blijven en op aanvraag worden geconsumeerd. Spark zal enige tijd wachten met het verzamelen van berichten om ze samen te verwerken, waardoor de overhead wordt verminderd en de latentie toeneemt. Dit kan worden afgestemd op uw wensen.

Ik ben geen supersnelle typer, dus Spark verwerkt het bericht voordat ik nieuwe in de huidige batch kan opnemen.

En dat was onze eerste streamingklus!

Ik hoop dat je het gevoel krijgt: het is niet moeilijk om een ​​streamverwerkingstaak te coderen, maar er zijn enkele valkuilen.

Gegevens schrijven naar een Kafka-stream

Nu is het tijd om te gaan spelen met de sensorgegevens.

U kunt de ritssluiting bestand uit AUGUSTUS 2022 en pak het uit in het /gegevens volume. De gegevens bevinden zich oorspronkelijk in JSON en nemen ongeveer 23 GB aan ruimte in beslag. Het eerste dat u moet doen, is het naar parket converteren om de schijfruimte en leestijd te optimaliseren.

De spark-taken om dit te doen worden gedetailleerd beschreven in de GitHub-repository. Het enige wat u hoeft te doen is ze uit te voeren:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Afhankelijk van uw machine kan de uitvoering enige tijd duren. Maar het loont de moeite: de uiteindelijke parketbestandsgrootte is ~1Gb (meer dan 20x kleiner) en veel sneller te lezen.

We moeten ook het Kafka-onderwerp maken om onze berichten te ontvangen:

kafka-topics.sh --create --replicatiefactor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Als u de binnenkomende berichten wilt weergeven, is het optioneel mogelijk om een ​​consoleconsument in te stellen.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Het schrijven van gegevens over een Kafka-onderwerp is eenvoudig, maar bevat enkele details.

Bij gestructureerde streaming is het standaardgedrag om niet te proberen het gegevensschema (kolommen en hun typen) af te leiden, dus we moeten er een doorgeven.

Kafka-berichten zijn slechts binaire sleutel-waardeparen, dus we moeten onze gegevens in dit formaat weergeven. Dit kan eenvoudig worden bereikt door alle rijen naar JSON-strings te converteren, deze in binair coderen en het resultaat op te slaan in de kolom "waarde".

Kolommen transformeren naar JSON-tekenreeksen. Afbeelding door auteur.

Berichtsleutels zijn erg belangrijk in Kafka, maar zullen niet nuttig zijn in onze tests, dus alle berichten zullen hetzelfde hebben.

Zoals eerder vermeld is deze dataset ENORM, dus heb ik het aantal ingevoegde berichten beperkt tot 500,000.

Ten slotte passeren we de Kafka-server en het onderwerp en een “controlepuntLocatie”waar de vonk de voortgang van de uitvoering opslaat, handig om te herstellen van fouten.

Het uitvoeren van de klus:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Gegevens in Kafka invoegen. Afbeelding door auteur.

Aan de linkerkant leest de Spark-taak het bestand, aan de rechterkant: a kafka-console-consument toont de binnenkomende berichten.

Ons verkeersonderwerp is gevuld en bijna klaar om te worden verwerkt.

Het is belangrijk om te onthouden dat we een sparkjob hebben gebruikt om ons onderwerp in te vullen, alleen voor leerdoeleinden. In een reëel scenario sturen de sensoren zelf de metingen rechtstreeks naar Kafka.

Om dit dynamische gedrag te simuleren, schrijft het onderstaande script elke 1 seconde 2.5 rij naar het onderwerp.

Uitvoermodi — Het aantal voertuigen tellen per type

Laten we verder gaan en een taak maken om het aantal voertuigen per type te tellen.

De kolom “Classificação” (Classificatie) bevat het gedetecteerde voertuigtype.

Terwijl we uit het onderwerp lezen, moeten we de binaire JSON-tekenreeksen terug naar het kolomformaat converteren.

Zodra dit is gebeurd, kan de query zoals gewoonlijk worden samengesteld. Het is interessant om op te merken dat het vraaghart slechts het kiezen().groepDoor().tellen() reeks, de rest is relatief aan streaminglogica.

Het is dus tijd om de problemen aan te pakken uitvoermodus() keuze.

De uitvoermodus van een streamtoepassing specificeert hoe we de resultaten willen (her)berekenen en schrijven zodra er nieuwe gegevens binnenkomen.

Het kan drie verschillende waarden aannemen:

  • toevoegen: Voeg alleen nieuwe records toe aan de uitvoer.
  • Volledige: Bereken het volledige resultaat voor elke nieuwe record opnieuw.
  • bijwerken: Gewijzigde records bijwerken.

Deze modi kunnen wel of niet zinvol zijn, afhankelijk van de geschreven applicatie. De modus 'volledig' heeft bijvoorbeeld mogelijk geen zin als er sprake is van groeperen of sorteren.

Laten we de taak in de “volledige” modus uitvoeren en naar de resultaten kijken.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão - Vrachtwagen, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Afbeelding door auteur.

Terwijl nieuwe records in de stream worden ingevoegd (zie de terminal aan de rechterkant), berekent de taak het volledige resultaat opnieuw. Dit kan handig zijn in situaties waarin de rijvolgorde belangrijk is, zoals rangschikking of competitie.

Deze aanpak is echter mogelijk niet optimaal als het aantal groepen te groot is of als de individuele veranderingen geen invloed hebben op het algehele resultaat.

Een andere optie is dus om de uitvoermodus “update” te gebruiken, die alleen een nieuw bericht genereert voor de groepen die zijn gewijzigd. Zie hieronder:

De query met uitvoermodus “update”. Afbeelding door auteur.

De modus 'toevoegen' is niet beschikbaar voor zoekopdrachten met groepering, dus ik kan niet weergeven met dezelfde taak. Maar ik denk dat dit de eenvoudigste modus is altijd voegt een nieuw record toe aan de uitvoer.

Deze uitvoermodi zijn eenvoudiger te begrijpen als u erover nadenkt de resultaten in een tabel op te slaan. In de volledige uitvoermodus wordt de tabel herschreven voor elk nieuw verwerkt bericht, in de updatemodus alleen de regels waarop een update heeft plaatsgevonden, en de append voegt altijd een nieuwe regel toe aan het einde.

Tumbling-tijdvenster: aggregeren met behulp van tijdsintervallen

In streamingsystemen hebben berichten twee verschillende tijdstempels die daarmee verband houden: Gebeurtenistijd – het tijdstip waarop het bericht is gemaakt, in ons geval de leestijd van de sensor, en verwerkingstijd – wanneer het bericht wordt gelezen door de verwerkingsagent, in ons geval wanneer het bereikt Spark.

Een belangrijk kenmerk van tools voor stroomverwerking is de mogelijkheid om tijdsverwerking van gebeurtenissen af ​​te handelen. Tumbling-vensters zijn niet-overlappende vaste tijdsintervallen die worden gebruikt om aggregaties te maken met behulp van gebeurtenis-tijd-kolommen. Om het simpeler te zeggen: ze verdelen de tijdlijn in plakjes van gelijke grootte, zodat elke gebeurtenis tot een enkel interval behoort.

Tel bijvoorbeeld elke 5 minuten hoeveel voertuigen er in de afgelopen 5 minuten zijn gedetecteerd.

5 minuten tuimelend venster. Afbeelding door auteur.

De onderstaande code illustreert dit:

Dit soort verwerking kan in veel situaties uiterst nuttig zijn. Terugkomend op de eerder voorgestelde verkeersopstoppingsdetector: een mogelijke aanpak is om de gemiddelde snelheid van de voertuigen binnen een tijdsbestek van 10 minuten te meten en te kijken of deze onder een bepaalde drempel ligt.

Gebeurtenis-tijdverwerking is een complex onderwerp. Er kan van alles gebeuren als u ermee te maken krijgt, zoals berichten die verloren gaan, te laat aankomen of niet meer werken. Spark heeft verschillende mechanismen om de problemen te verzachten, zoals watermerken, waar we ons niet op zullen concentreren.

Tijdvensters kunnen ook worden gebruikt in combinatie met andere kolommen in de groepDoor(). In het onderstaande voorbeeld wordt het aantal voertuigen per type geteld in een tijdsbestek van 5 minuten.

Glijdend tijdvenster — Flexibilisering van de tijdsintervallen

Glijdende tijdvensters zijn een flexibilisering van tuimelende vensters. In plaats van niet-overlappende intervallen te creëren, kunt u definiëren hoe vaak elk interval wordt gemaakt.

Tel bijvoorbeeld elke 5 minuten hoeveel voertuigen er in de afgelopen 30 minuten zijn gedetecteerd.

Hierdoor kunnen gebeurtenissen tot vele intervallen behoren en zo vaak als nodig worden geteld.

Om een ​​schuifvenster te definiëren, hoeft u alleen maar het update-interval door te geven aan de venster() functie.

Laten we de uitvoer bekijken.

Zoals we kunnen zien, worden er elke 30 minuten vensters van 5 minuten gemaakt.

Deze flexibiliteit kan heel nuttig zijn om specifiekere bedrijfsregels en complexere triggers te definiëren. Onze filedetector kan bijvoorbeeld elke 5 seconden reacties sturen over de afgelopen 10 minuten en een waarschuwing genereren wanneer de gemiddelde autosnelheid onder de 20 km/u zakt.

Dit was een snelle blik op de belangrijkste concepten van Spark Structured Streaming en hoe deze met Kafka kunnen worden toegepast.

Apache Kafka en Apache Spark zijn zowel betrouwbare als robuuste tools die door veel bedrijven worden gebruikt om dagelijks ongelooflijke hoeveelheden gegevens te verwerken, waardoor ze een van de sterkste paren zijn in de stroomverwerkingstaak.

We hebben geleerd hoe we Kafka-onderwerpen kunnen invullen, gebruiken en verwerken met behulp van Spark-taken. Dit was geen moeilijke taak, zoals vermeld in het bericht, de API voor streamverwerking is bijna gelijk aan de gebruikelijke batch-API, met slechts enkele kleine aanpassingen.

We hebben ook verschillende uitvoermodi besproken, iets specifieks voor het streamen van applicaties, en hoe elke modus kan worden gebruikt. Last but not least hebben we aggregaties met tijdvensters onderzocht, een van de belangrijkste mogelijkheden van streamverwerking.

Nogmaals, dit was een snelle blik, en ik zal hieronder enkele referenties achterlaten als je er dieper op in wilt gaan.

Ik hoop dat ik op de een of andere manier heb geholpen, bedankt voor het lezen! 🙂

Hierin is alle code beschikbaar GitHub-repository.
Gebruikte gegevens —
Contagens Volumétricas de Radares, Open data, Braziliaanse regering

[1] Functie Deep Dive: watermerken in Apache Spark gestructureerde streaming — Max Fisher op Databricks-blog
[2] Chambers, B., en Zaharia, M. (2018). Spark: De definitieve gids: Big data-verwerking eenvoudig gemaakt. "O'Reilly Media, Inc.".
[3] Realtime logistiek, verzending en transport met Apache Kafka– Kai Waehner
[4] Met Apache Kafka in de Netflix Studio en Finance World - Confluente blog
[5] Spark-streaming en Kafka — https://sparkbyexamples.com/

Een snelle blik op Spark Structured Streaming + Kafka opnieuw gepubliceerd vanaf bron https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 via https:/ /towardsdatascience.com/feed

<!–

->

Tijdstempel:

Meer van Blockchain-adviseurs