Ein kurzer Blick auf Spark Structured Streaming + Kafka

Erlernen Sie die Grundlagen zur Verwendung dieses leistungsstarken Duos für Stream-Verarbeitungsaufgaben

Photo by Nikhita Singhal on Unsplash

Vor kurzem habe ich begonnen, mich intensiv mit Apache Kafka und Apache Spark zu befassen, zwei führenden Technologien in der Welt der Datentechnik.

Ich habe in den letzten Monaten mehrere Projekte mit ihnen gemacht; „Streaming von maschinellem Lernen mit Kafka, Debezium und BentoML" ist ein Beispiel. Mein Fokus liegt darauf, zu lernen, wie man mit diesen modernen, berühmten Tools leistungsstarke Datenpipelines erstellt, und ein Gefühl für ihre Vor- und Nachteile zu bekommen.

In den letzten Monaten habe ich bereits erläutert, wie man ETL-Pipelines mit beiden Tools erstellt, jedoch nie zusammen, und das ist die Lücke, die ich heute schließen werde.

Unser Ziel ist es, die allgemeine Idee hinter der Erstellung einer Streaming-Anwendung mit Spark+Kafka kennenzulernen und anhand realer Daten einen kurzen Einblick in die Hauptkonzepte zu geben.

Die Idee ist einfach – Apache Kafka ist ein Nachrichten-Streaming-Tool, bei dem Produzenten Nachrichten an ein Ende einer Warteschlange schreiben (genannt a Thema) zum anderen von den Verbrauchern gelesen werden.

Aber es handelt sich um ein sehr komplexes Tool, das als robuster verteilter Messaging-Dienst mit allen möglichen Zustellungsgarantien (exactly Once, Once, Any), Nachrichtenspeicherung und Nachrichtenreplikation konzipiert ist und gleichzeitig Flexibilität, Skalierbarkeit und hohen Durchsatz ermöglicht. Es verfügt über ein breiteres Spektrum an Anwendungsfällen, wie z. B. Microservices-Kommunikation, Echtzeit-Ereignissysteme und Streaming-ETL-Pipelines.

Apache Spark ist eine verteilte speicherbasierte Datentransformations-Engine.

Es handelt sich außerdem um ein sehr komplexes Tool, das sich mit allen Arten von Datenbanken, Dateisystemen und Cloud-Infrastrukturen verbinden kann. Es ist auf den Betrieb in verteilten Umgebungen ausgerichtet, um die Verarbeitung zwischen Maschinen zu parallelisieren und mithilfe seiner Lazy-Evaluation-Philosophie und Abfrageoptimierungen leistungsstarke Transformationen zu erreichen.

Das Coole daran ist, dass der Code letztendlich nur Ihre übliche SQL-Abfrage oder (fast) Ihr Python+Pandas-Skript ist, wobei die ganze Hexerei unter einer schönen, benutzerfreundlichen High-Level-API abstrahiert ist.

Kombinieren Sie diese beiden Technologien und wir haben die perfekte Ergänzung zum Aufbau einer Streaming-ETL-Pipeline.

Wir nutzen die Daten von Verkehrssensoren in der Stadt Belo Horizonte (BH), der Hauptstadt von Minas Gerais (Brasilien). Es handelt sich um einen riesigen Datensatz, der Messungen des Verkehrsflusses an mehreren Orten in der Stadt enthält. Jeder Sensor erkennt regelmäßig die Art des an diesem Ort fahrenden Fahrzeugs (Auto, Motorrad, Bus/LKW), seine Geschwindigkeit und Länge (sowie andere Informationen, die wir nicht verwenden werden).

Dieser Datensatz stellt genau eine der klassischen Anwendungen für Streaming-Systeme dar – eine Gruppe von Sensoren, die ihre Messwerte kontinuierlich aus dem Feld senden.

In diesem Szenario kann Apache Kafka als Abstraktionsschicht zwischen den Sensoren und den Anwendungen, die ihre Daten konsumieren, verwendet werden.

Kafka nutzte es als Abstraktionsschicht zwischen Quellen und Diensten. Bild vom Autor.

Mit dieser Art von Infrastruktur ist es möglich, alle Arten von (sogenannten) Echtzeit-ereignisgesteuerte Systeme, wie ein Programm zur Erkennung und Warnung vor Staus, wenn die Anzahl der Fahrzeuge bei sinkender Durchschnittsgeschwindigkeit plötzlich zunimmt.

Und hier kommt Apache Spark ins Spiel.

Es verfügt über ein natives Modul für die Stream-Verarbeitung namens Spark strukturiertes Streaming, das sich mit Kafka verbinden und dessen Nachrichten verarbeiten kann.

Umgebung einrichten

Sie benötigen lediglich Docker und Docker-Compose.

Wir verwenden eine Docker-Compose-Dateikonfiguration basierend auf den folgenden Repositorys: Link Funke, Link Kafka.

Das ./src Volumen ist der Ort, an dem wir unsere Skripte ablegen werden.

Um die Umgebung zu starten, führen Sie sie einfach aus

Docker-komponieren

Der gesamte Code ist hier verfügbar GitHub-Repository.

Eines der Dinge, die mir am meisten gefielen, als ich anfing, Spark zu studieren, war die Ähnlichkeit zwischen dem dafür geschriebenen Code und meinen üblichen Python+Pandas-Skripten. Die Migration war sehr einfach.

Der gleichen Logik folgend ist das Streaming-Modul von Spark dem üblichen Spark-Code sehr ähnlich, was die Migration von Batch-Anwendungen zu Stream-Anwendungen erleichtert.

Vor diesem Hintergrund konzentrieren wir uns in den folgenden Abschnitten darauf, die Besonderheiten des strukturierten Spark-Streamings kennenzulernen, d. h. welche neuen Funktionen es bietet.

Unser erster Auftrag

Fangen wir langsam an und bauen ein Beispielspielzeug

Als Erstes erstellen Sie ein Kafka-Thema, aus dem unser Spark-Job die Nachrichten konsumiert.

Dies geschieht durch Zugang zum Kafka-Containerterminal und ausführen:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Um zu simulieren, dass ein Produzent Nachrichten zu diesem Thema schreibt, verwenden wir die Kafka-Konsolenproduzent. Außerdem im Container:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Von nun an wird jede im Terminal eingegebene Zeile als Nachricht an das Testthema gesendet. Das Zeichen „:“ wird verwendet, um den Schlüssel und den Wert der Nachricht (Schlüssel:Wert) zu trennen.

Erstellen wir einen Spark-Job, um dieses Thema zu nutzen.

Der Code muss in das eingefügt werden /src/streaming Ordner (nichts Besonderes, nur der Ordner, den ich ausgewählt habe).

Das Wichtigste ist, dass wir die Attribute verwenden readStream und writeStream, statt normalem Lesen und Schreiben. Dies ist der Hauptaspekt, der Spark dazu bringt, unseren Job als Streaming-Anwendung zu behandeln.

Um eine Verbindung zu Kafka herzustellen, ist die Angabe des Servers und des Themas erforderlich. Die Option StartingOffsets=“frühestens“ weist Spark an, das Thema von Anfang an zu lesen. Auch weil Kafka seine Nachrichten in speichert binär Form, in die sie dekodiert werden müssen Schnur.

Die anderen Optionen werden weiter untersucht.

Jetzt greifen wir auf den Spark-Container zu und führen den Job aus.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Nach einigen Sekunden der Konfiguration beginnt das Thema zu konsumieren.

Funkenverzehrende Botschaften von Kafka. Bild vom Autor.

Spark Streaming funktioniert Mikro-Batching Modus, und deshalb sehen wir die „Batch“-Informationen, wenn die Nachrichten konsumiert werden.

Micro-Batching liegt in etwa zwischen dem vollständigen „echten“ Streaming, bei dem alle Nachrichten bei ihrem Eintreffen einzeln verarbeitet werden, und dem üblichen Batch, bei dem die Daten statisch bleiben und bei Bedarf verbraucht werden. Spark wartet einige Zeit und versucht, Nachrichten zu sammeln, um sie gemeinsam zu verarbeiten, wodurch der Overhead reduziert und die Latenz erhöht wird. Dies kann auf Ihre Bedürfnisse abgestimmt werden.

Da ich nicht besonders schnell schreibe, verarbeitet Spark die Nachricht, bevor ich neue in den aktuellen Stapel einfügen kann.

Und das war unser erster Streaming-Job!

Ich hoffe, dass Sie das Gefühl bekommen: Es ist nicht schwer, einen Stream-Verarbeitungsjob zu programmieren, aber es gibt einige Fallstricke.

Daten in einen Kafka-Stream schreiben

Jetzt ist es an der Zeit, mit den Sensordaten zu spielen.

Ist Sie können die Reißverschluss Datei aus AUGUST 2022 und extrahieren Sie sie in die /Daten Volumen. Die Daten liegen ursprünglich in JSON vor und belegen etwa 23 GB Speicherplatz. Der erste Schritt besteht darin, es in Parkett umzuwandeln, um den Speicherplatz und die Lesezeit zu optimieren.

Die hierfür erforderlichen Spark-Jobs sind im GitHub-Repository detailliert beschrieben. Sie müssen sie lediglich ausführen:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Abhängig von Ihrer Maschine kann die Ausführung einige Zeit dauern. Aber es zahlt sich aus: Die endgültige Parquet-Dateigröße beträgt ~1 GB (mehr als 20-mal kleiner) und ist viel schneller zu lesen.

Wir müssen auch das Kafka-Thema erstellen, um unsere Nachrichten zu empfangen:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic Traffic_sensor

Wenn Sie die eingehenden Nachrichten anzeigen möchten, können Sie optional einen Konsolenkonsumenten einrichten.

kafka-console-consumer.sh --topic Traffic_sensor --bootstrap-server localhost:9092

Das Schreiben von Daten zu einem Kafka-Thema ist einfach, weist jedoch einige Details auf.

Beim strukturierten Streaming besteht das Standardverhalten darin, nicht zu versuchen, das Datenschema (Spalten und ihre Typen) abzuleiten, daher müssen wir eines übergeben.

Kafka-Nachrichten sind lediglich Schlüssel-Wert-Binärzeichenfolgenpaare, daher müssen wir unsere Daten in diesem Format darstellen. Dies lässt sich leicht erreichen, indem man alle Zeilen in JSON-Strings konvertiert, sie binär kodiert und das Ergebnis in der Spalte „Wert“ speichert.

Spalten in JSON-Strings umwandeln. Bild vom Autor.

Nachrichtenschlüssel sind in Kafka sehr wichtig, aber in unseren Tests werden sie nicht nützlich sein, sodass alle Nachrichten dasselbe haben.

Wie bereits erwähnt, ist dieser Datensatz RIESIG, daher habe ich die Anzahl der eingefügten Nachrichten auf 500,000 begrenzt.

Schließlich übergeben wir den Kafka-Server und das Thema und ein „PrüfpunktStandort”Hier speichert der Spark den Ausführungsfortschritt, was für die Wiederherstellung nach Fehlern nützlich ist.

Ausführung des Jobs:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Daten in Kafka einfügen. Bild vom Autor.

Links liest der Spark-Job die Datei, rechts a kafka-console-consumer zeigt die eingegangenen Nachrichten an.

Unser Verkehrsthema ist ausgefüllt und fast bereit zur Bearbeitung.

Es ist wichtig, sich daran zu erinnern, dass wir einen Spark-Job verwendet haben, um unser Thema nur zu Lernzwecken zu füllen. In einem realen Szenario senden die Sensoren selbst Messwerte direkt an Kafka.

Um dieses dynamische Verhalten zu simulieren, schreibt das folgende Skript alle 1 Sekunden eine Zeile in das Thema.

Ausgabemodi – Zählen der Anzahl der Fahrzeuge nach Typ

Als nächstes erstellen wir einen Job, um die Anzahl der Fahrzeuge nach Typ zu zählen.

Die Spalte „Classificação“ (Klassifizierung) enthält den erkannten Fahrzeugtyp.

Während wir das Thema lesen, müssen wir die JSON-Binärzeichenfolgen zurück in das Spaltenformat konvertieren.

Sobald dies erledigt ist, kann die Abfrage wie gewohnt erstellt werden. Es ist interessant festzustellen, dass das Abfrageherz genau das ist wählen().gruppiere nach().zählen()-Sequenz, der Rest ist relativ zur Streaming-Logik.

Es ist also an der Zeit, sich mit dem Problem zu befassen Ausgabemodus() Möglichkeit.

Der Ausgabemodus einer Stream-Anwendung gibt an, wie wir die Ergebnisse (neu) berechnen und schreiben möchten, wenn neue Daten eintreffen.

Es kann drei verschiedene Werte annehmen:

  • Anhängen: Nur neue Datensätze zur Ausgabe hinzufügen.
  • Komplett: Berechnen Sie das vollständige Ergebnis für jeden neuen Datensatz neu.
  • Aktualisierung: Geänderte Datensätze aktualisieren.

Diese Modi können je nach geschriebener Anwendung sinnvoll sein oder auch nicht. Beispielsweise ist der Modus „Vollständig“ möglicherweise nicht sinnvoll, wenn eine Gruppierung oder Sortierung durchgeführt wird.

Lassen Sie uns den Job im Modus „Abgeschlossen“ ausführen und die Ergebnisse betrachten.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão – LKW, Automóvel-Car, Indefinido-Undefiniert, Ônibus-Bus, Moto-Motorrad. Bild vom Autor.

Wenn neue Datensätze in den Stream eingefügt werden (siehe Terminal rechts), berechnet der Job das vollständige Ergebnis neu. Dies kann in Situationen nützlich sein, in denen die Zeilenreihenfolge wichtig ist, z. B. in der Rangfolge oder im Wettbewerb.

Dieser Ansatz ist jedoch möglicherweise nicht optimal, wenn die Anzahl der Gruppen zu groß ist oder die einzelnen Änderungen keinen Einfluss auf das Gesamtergebnis haben.

Eine andere Möglichkeit besteht darin, den Ausgabemodus „Aktualisieren“ zu verwenden, der nur für die Gruppen, die sich geändert haben, eine neue Nachricht generiert. Siehe unten:

Die Abfrage mit Ausgabemodus „Update“. Bild vom Autor.

Der Modus „Anhängen“ ist für Abfragen mit Gruppierung nicht verfügbar, daher kann ich nicht anzeigen, dass derselbe Job verwendet wird. Aber ich denke, dass es der einfachste Modus ist immer Fügt der Ausgabe einen neuen Datensatz hinzu.

Diese Ausgabemodi sind einfacher zu verstehen, wenn Sie darüber nachdenken, die Ergebnisse in einer Tabelle zu speichern. Im vollständigen Ausgabemodus wird die Tabelle für jede neue verarbeitete Nachricht neu geschrieben, im Aktualisierungsmodus werden nur die Zeilen neu geschrieben, in denen eine Aktualisierung stattgefunden hat, und das Anhängen fügt am Ende immer eine neue Zeile hinzu.

Taumelndes Zeitfenster – Aggregation mithilfe von Zeitintervallen

In Streaming-Systemen sind Nachrichten mit zwei verschiedenen Zeitstempeln verknüpft: Ereigniszeit – die Zeit, zu der die Nachricht erstellt wurde, in unserem Fall die Lesezeit des Sensors, und Verarbeitungszeit – Zeitpunkt, zu dem die Nachricht vom Verarbeitungsagenten gelesen wird, in unserem Fall wann es erreicht Spark.

Ein wichtiges Merkmal von Stream-Verarbeitungstools ist die Fähigkeit, die Ereigniszeitverarbeitung zu verarbeiten. Taumelnde Fenster sind nicht überlappende feste Zeitintervalle, die zur Aggregation mithilfe von Ereignis-Zeit-Spalten verwendet werden. Um es einfacher auszudrücken: Sie unterteilen die Zeitachse in gleich große Abschnitte, sodass jedes Ereignis zu einem einzelnen Intervall gehört.

Zählen Sie beispielsweise alle 5 Minuten, wie viele Fahrzeuge in den letzten 5 Minuten erkannt wurden.

5-minütiges Taumelfenster. Bild vom Autor.

Der folgende Code veranschaulicht dies:

Diese Art der Verarbeitung kann in vielen Situationen äußerst nützlich sein. Um auf den zuvor vorgeschlagenen Staudetektor zurückzukommen: Ein möglicher Ansatz besteht darin, die Durchschnittsgeschwindigkeit der Fahrzeuge in einem 10-Minuten-Fenster zu messen und zu prüfen, ob sie unter einem bestimmten Schwellenwert liegt.

Die Ereigniszeitverarbeitung ist ein komplexes Thema. Beim Umgang damit kann alles passieren, zum Beispiel, dass Nachrichten verloren gehen, zu spät ankommen oder nicht mehr in Ordnung sind. Spark verfügt über mehrere Mechanismen, um die Probleme zu entschärfen, z Wasserzeichen, auf die wir uns nicht konzentrieren werden.

Zeitfenster können auch in Verbindung mit anderen Spalten im verwendet werden gruppiere nach(). Das folgende Beispiel zählt die Anzahl der Fahrzeuge nach Typ in einem 5-Minuten-Fenster.

Gleitendes Zeitfenster – Flexibilisierung der Zeitintervalle

Gleitende Zeitfenster sind eine Flexibilisierung von Taumelfenstern. Anstatt nicht überlappende Intervalle zu erstellen, ermöglichen sie die Definition, wie oft jedes Intervall erstellt wird.

Zählen Sie beispielsweise alle 5 Minuten, wie viele Fahrzeuge in den letzten 30 Minuten erkannt wurden.

Aus diesem Grund können Ereignisse zu vielen Intervallen gehören und so oft wie nötig gezählt werden.

Um ein Schiebefenster zu definieren, übergeben Sie einfach das Aktualisierungsintervall an Fenster() Funktion.

Schauen wir uns die Ausgabe an.

Wie wir sehen können, werden alle 30 Minuten 5-Minuten-Fenster erstellt.

Diese Flexibilität kann sehr nützlich sein, um spezifischere Geschäftsregeln und komplexere Auslöser zu definieren. Beispielsweise kann unser Staudetektor alle 5 Sekunden Antworten über die letzten 10 Minuten senden und eine Warnung auslösen, wenn die durchschnittliche Fahrzeuggeschwindigkeit unter 20 km/h fällt.

Dies war ein kurzer Blick auf die Hauptkonzepte von Spark Structured Streaming und wie sie mit Kafka angewendet werden können.

Apache Kafka und Apache Spark sind beide zuverlässige und robuste Tools, die von vielen Unternehmen zur täglichen Verarbeitung unglaublicher Datenmengen eingesetzt werden, was sie zu einem der stärksten Paare bei der Stream-Verarbeitungsaufgabe macht.

Wir haben gelernt, wie man Kafka-Themen mithilfe von Spark-Jobs füllt, konsumiert und verarbeitet. Dies war keine schwierige Aufgabe, da die Stream-Verarbeitungs-API, wie im Beitrag erwähnt, fast gleich der üblichen Batch-API ist, mit nur einigen geringfügigen Anpassungen.

Wir haben auch verschiedene Ausgabemodi besprochen, etwas Spezielles für Stream-Anwendungen und wie jeder einzelne verwendet werden kann. Zu guter Letzt haben wir Aggregationen mit Zeitfenstern untersucht, eine der Hauptfunktionen der Stream-Verarbeitung.

Auch dies war nur ein kurzer Blick, und ich werde unten einige Referenzen hinterlassen, wenn Sie tiefer in die Materie eintauchen möchten.

Ich hoffe, ich habe irgendwie geholfen, danke fürs Lesen! 🙂

Der gesamte Code ist hier verfügbar GitHub-Repository.
Verwendete Daten –
Contagens Volumétricas de Radares, Offene Daten, brasilianischer Gouverneur.

[1] Feature Deep Dive: Wasserzeichen im strukturierten Apache Spark-Streaming – Max Fisher im Databricks-Blog
[2] Chambers, B. & Zaharia, M. (2018). Spark: Der ultimative Leitfaden: Big-Data-Verarbeitung leicht gemacht. „O’Reilly Media, Inc.“
[3] Logistik, Versand und Transport in Echtzeit mit Apache Kafka— Kai Waehner
[4] Mit Apache Kafka in der Netflix Studio- und Finanzwelt — Confluent-Blog
[5] Spark Streaming & Kafka – https://sparkbyexamples.com/

Ein kurzer Blick auf Spark Structured Streaming + Kafka, neu veröffentlicht aus der Quelle https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 über https:/ /towardsdatascience.com/feed

<!–

->

Zeitstempel:

Mehr von Blockchain-Berater