Un aperçu rapide de Spark Structured Streaming + Kafka

Apprendre les bases de l'utilisation de ce puissant duo pour les tâches de traitement de flux

Photo par Nikhita Singhal on Unsplash

Récemment, j'ai commencé à beaucoup étudier Apache Kafka et Apache Spark, deux technologies leaders dans le monde de l'ingénierie des données.

J'ai réalisé plusieurs projets en les utilisant au cours des derniers mois ; "Streaming d'apprentissage automatique avec Kafka, Debezium et BentoML» en est un exemple. Mon objectif est d'apprendre à créer de puissants pipelines de données avec ces outils modernes célèbres et d'avoir une idée de leurs avantages et de leurs inconvénients.

Au cours des derniers mois, j'ai déjà expliqué comment créer des pipelines ETL en utilisant les deux outils, mais sans jamais les utiliser ensemble, et c'est la lacune que je vais combler aujourd'hui.

Notre objectif est d'apprendre l'idée générale derrière la création d'une application de streaming avec Spark+Kafka et de donner un aperçu rapide de ses principaux concepts en utilisant des données réelles.

L'idée est simple : Apache Kafka est un outil de streaming de messages, dans lequel les producteurs écrivent des messages à une extrémité d'une file d'attente (appelée file d'attente). sujet) pour être lu par les consommateurs d'autre part.

Mais il s'agit d'un outil très complexe, conçu pour être un service de messagerie distribué résilient, avec toutes sortes de garanties de livraison (exactement une fois, une fois, n'importe quand), le stockage et la réplication des messages, tout en permettant également flexibilité, évolutivité et débit élevé. Il propose un ensemble plus large de cas d'utilisation, tels que la communication par microservices, les systèmes d'événements en temps réel et les pipelines ETL de streaming.

Apache Spark est un moteur de transformation de données basé sur la mémoire distribuée.

C'est également un outil très complexe, capable de se connecter à toutes sortes de bases de données, de systèmes de fichiers et d'infrastructures cloud. Il est conçu pour fonctionner dans des environnements distribués afin de paralléliser le traitement entre les machines, réalisant des transformations hautes performances en utilisant sa philosophie d'évaluation paresseuse et ses optimisations de requêtes.

Ce qui est intéressant, c'est qu'en fin de compte, le code n'est plus que votre requête SQL habituelle ou (presque) votre script Python+pandas, avec toute la sorcellerie résumée sous une API de haut niveau conviviale et agréable.

Rejoignez ces deux technologies et nous avons une adéquation parfaite pour créer un pipeline ETL de streaming.

Nous utiliserons les données des capteurs de trafic de la ville de Belo Horizonte (BH), la capitale du Minas Gerais (Brésil). Il s'agit d'un énorme ensemble de données contenant des mesures du flux de circulation à plusieurs endroits de la ville. Chaque capteur détecte périodiquement le type de véhicule circulant à cet endroit (voiture, moto, bus/camion), sa vitesse et sa longueur (et d'autres informations que nous n'utiliserons pas).

Cet ensemble de données représente précisément l’une des applications classiques des systèmes de streaming : un groupe de capteurs envoyant leurs mesures en continu depuis le terrain.

Dans ce scénario, Apache Kafka peut être utilisé comme couche d'abstraction entre les capteurs et les applications qui consomment leurs données.

Kafka est utilisé comme couche d'abstraction entre les sources et les services. Image par auteur.

Avec ce type d'infrastructure, il est possible de construire toutes sortes de (ce qu'on appelle) systèmes pilotés par événements en temps réel, comme un programme pour détecter et alerter les embouteillages lorsque le nombre de véhicules augmente soudainement avec une baisse de la vitesse moyenne.

Et c'est là qu'Apache Spark entre en jeu.

Il dispose d'un module natif pour le traitement des flux appelé Spark Streaming structuré, qui peut se connecter à Kafka et traiter ses messages.

Mise en place de l'environnement

Tout ce dont vous avez besoin est de docker et de docker-compose.

Nous utiliserons une configuration de fichier docker-compose basée sur les référentiels suivants : lien étincelle, lien kafka.

Les ./src le volume est l'endroit où nous allons mettre nos scripts.

Pour démarrer l'environnement, exécutez simplement

docker-compose jusqu'à

Tout le code est disponible dans ce GitHub référentiel.

L'une des choses que j'ai le plus appréciées lorsque j'ai commencé à étudier Spark était la similitude entre le code écrit et mes scripts python+pandas habituels. C'était très facile de migrer.

Suivant la même logique, le module de streaming de Spark est très similaire au code Spark habituel, facilitant la migration des applications batch vers celles de flux.

Cela dit, dans les sections suivantes, nous nous concentrerons sur l'apprentissage des spécificités du streaming structuré Spark, c'est-à-dire de ses nouvelles fonctionnalités.

Notre premier travail

Commençons lentement et construisons un exemple de jouet

La première chose à faire est de créer un sujet Kafka à partir duquel notre travail Spark consommera les messages.

Ceci est fait par accéder au terminal à conteneurs Kafka et exécuter :

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Pour simuler un producteur écrivant des messages sur ce sujet, utilisons le kafka-console-producteur. Également à l'intérieur du conteneur :

kafka-console-producter.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Désormais, chaque ligne saisie dans le terminal sera envoyée sous forme de message au sujet de test. Le caractère « : » est utilisé pour séparer la clé et la valeur du message (clé:valeur).

Créons une tâche Spark pour utiliser ce sujet.

Le code doit être mis à l'intérieur du /src/streaming dossier (rien de spécial, juste le dossier que j'ai choisi).

L'essentiel à noter est que nous utilisons les attributs lireStream ainsi que écrireStream, au lieu de la lecture et de l'écriture normales. C'est l'aspect principal qui fait que Spark considère notre travail comme une application de streaming.

Pour se connecter à Kafka, il est nécessaire de préciser le serveur et le sujet. L'option StartingOffsets="au plus tôt » indique à Spark de lire le sujet depuis le début. De plus, comme Kafka stocke ses messages dans binaire forme, ils doivent être décodés pour un magnifique.

Les autres options seront étudiées plus en détail.

Maintenant, accédons au conteneur Spark et exécutons le travail.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Après quelques secondes de configuration, il commencera à consommer le sujet.

Spark consomme des messages de Kafka. Image par auteur.

Spark Streaming fonctionne dans micro-dosage mode, et c'est pourquoi nous voyons les informations « batch » lorsqu'il consomme les messages.

Le micro-batching se situe quelque peu entre le « vrai » streaming complet, où tous les messages sont traités individuellement à leur arrivée, et le batch habituel, où les données restent statiques et sont consommées à la demande. Spark attendra un certain temps pour essayer d'accumuler les messages pour les traiter ensemble, réduisant ainsi la surcharge et augmentant la latence. Cela peut être adapté à vos besoins.

Je ne suis pas un type très rapide, donc Spark traite le message avant de pouvoir en inclure de nouveaux dans le lot actuel.

Et c'était notre premier travail de streaming !

J'espère que vous avez le sentiment : ce n'est pas difficile de coder un travail de traitement de flux, mais il y a quelques pièges.

Écrire des données dans un flux Kafka

Il est maintenant temps de commencer à jouer avec les données des capteurs.

Vous pouvez télécharger le Zip *: français fichier d’AOÛT 2022 et extrayez-le dans le /Les données volume. Les données sont à l'origine au format JSON et occupent environ 23 Go d'espace. La première chose à faire est de le convertir en parquet pour optimiser l'espace disque et le temps de lecture.

Les jobs spark pour faire cela sont détaillés dans le dépôt GitHub, il vous suffit de les exécuter :

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Selon votre machine, l'exécution peut prendre un certain temps. Mais cela s'avère payant, la taille finale du fichier parquet est d'environ 1 Go (plus de 20 fois plus petite) et beaucoup plus rapide à lire.

Il faut également créer le sujet Kafka pour recevoir nos messages :

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

En option, si vous souhaitez afficher les messages arrivant, il est possible de configurer un consommateur de console.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Écrire des données sur un sujet Kafka est simple, mais comporte quelques détails.

Dans le streaming structuré, le comportement par défaut est de ne pas essayer de déduire le schéma de données (colonnes et leurs types), nous devons donc en transmettre un.

Les messages Kafka ne sont que des paires de chaînes binaires clé-valeur, nous devons donc représenter nos données dans ce format. Ceci peut être facilement réalisé en convertissant toutes les lignes en chaînes JSON, en les codant en binaire et en stockant le résultat dans la colonne « valeur ».

Transformation des colonnes en chaînes JSON. Image par auteur.

Les clés de message sont très importantes dans Kafka, mais elles ne seront pas utiles dans nos tests, donc tous les messages auront la même chose.

Comme mentionné précédemment, cet ensemble de données est ÉNORME, j'ai donc limité le nombre de messages insérés à 500,000 XNUMX.

Enfin, on passe le serveur et le sujet Kafka et un «point de contrôleEmplacement" où le spark stockera la progression de l'exécution, utile pour récupérer des erreurs.

Exécution du travail :

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Insertion de données dans Kafka. Image par auteur.

A gauche, le job Spark lit le fichier, à droite, un kafka-console-consommateur affiche les messages arrivant.

Notre sujet de trafic est rempli et presque prêt à être traité.

Il est important de se rappeler que nous avons utilisé un travail Spark pour remplir notre sujet uniquement à des fins d'apprentissage. Dans un scénario réel, les capteurs eux-mêmes enverront les relevés directement à Kafka.

Pour simuler ce comportement dynamique, le script ci-dessous écrit 1 ligne dans le sujet toutes les 2.5 secondes.

Modes de sortie — Comptage du nombre de véhicules par type

Passons maintenant à la création d'un travail pour compter le nombre de véhicules par type.

La colonne « Classificação » (Classification) contient le type de véhicule détecté.

Pendant que nous lisons le sujet, nous devons reconvertir les chaînes binaires JSON au format en colonnes.

Une fois cela fait, la requête peut être construite comme d'habitude. Il est intéressant de noter que le cœur de la requête n'est que le Sélectionner()par groupe()compter(), tout le reste est relatif à la logique de streaming.

Il est donc temps d'aborder le Mode de sortie() option.

Le mode de sortie d'une application de flux spécifie comment nous voulons (re)calculer et écrire les résultats à mesure que de nouvelles données arrivent.

Il peut prendre trois valeurs différentes :

  • Ajouter: Ajoutez uniquement de nouveaux enregistrements à la sortie.
  • !: Recalculez le résultat complet pour chaque nouvel enregistrement.
  • Mises à jour: Mettre à jour les enregistrements modifiés.

Ces modes peuvent ou non avoir un sens selon l'application écrite. Par exemple, le mode « complet » peut ne pas avoir de sens si un regroupement ou un tri est effectué.

Exécutons le travail en mode « complet » et regardons les résultats.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Camion, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Image par auteur.

Au fur et à mesure que de nouveaux enregistrements sont insérés dans le flux (voir le terminal à droite), le travail recalcule le résultat complet. Cela peut être utile dans les situations où l'ordre des lignes est important, comme le classement ou la compétition.

Cependant, cette approche peut ne pas être optimale si le nombre de groupes est trop grand ou si les changements individuels n'ont pas d'impact sur le résultat global.

Ainsi, une autre option consiste à utiliser le mode de sortie « mise à jour », qui génère un nouveau message uniquement pour les groupes qui ont changé. Voir ci-dessous:

La requête avec le mode de sortie « mise à jour ». Image par auteur.

Le mode « ajouter » n'est pas disponible pour les requêtes avec regroupement, je ne pourrai donc pas afficher en utilisant le même travail. Mais je pense que c'est le mode le plus simple, il toujours ajoute un nouvel enregistrement à la sortie.

Ces modes de sortie sont plus simples à comprendre si vous envisagez de sauvegarder les résultats dans un tableau. En mode sortie complète, le tableau sera réécrit pour chaque nouveau message traité, en mode mise à jour, uniquement les lignes où une mise à jour a eu lieu, et l'ajout ajoutera toujours une nouvelle ligne à la fin.

Fenêtre temporelle de culbutage – Agrégation à l'aide d'intervalles de temps

Dans les systèmes de streaming, les messages sont associés à deux horodatages différents : Heure de l'événement – L'heure à laquelle le message a été créé, dans notre cas l'heure de lecture du capteur, et Heure de traitement – Lorsque le message est lu par l'agent de traitement, dans notre cas lorsque il atteint Spark.

Une caractéristique importante des outils de traitement de flux est la capacité à gérer le traitement du temps des événements. Les fenêtres culbutantes sont des intervalles de temps fixes sans chevauchement utilisés pour effectuer des agrégations à l'aide de colonnes d'heure d'événement. Pour le dire plus simplement, ils découpent la chronologie en tranches de taille égale afin que chaque événement appartienne à un seul intervalle.

Par exemple, comptez toutes les 5 minutes combien de véhicules ont été détectés au cours des 5 dernières minutes.

Fenêtre de culbutage de 5 minutes. Image par auteur.

Le code ci-dessous illustre ceci :

Ce type de traitement peut être extrêmement utile dans de nombreuses situations. Pour en revenir au détecteur d'embouteillage proposé précédemment, une approche possible consiste à mesurer la vitesse moyenne des véhicules sur une fenêtre de 10 minutes et à voir si elle est inférieure à un certain seuil.

Le traitement au moment des événements est un sujet complexe. Tout peut arriver lorsqu'on y fait face, comme la perte de messages, l'arrivée trop tard ou le désordre. Spark dispose de plusieurs mécanismes pour tenter d'atténuer les problèmes, comme filigranes, sur lequel nous ne nous concentrerons pas.

Les fenêtres horaires peuvent également être utilisées conjointement avec d'autres colonnes du par groupe(). L'exemple ci-dessous compte le nombre de véhicules par type dans une fenêtre de 5 minutes.

Fenêtre temporelle glissante — Flexibilisation sur les intervalles de temps

Les fenêtres horaires glissantes sont une flexibilisation des fenêtres tumultueuses. Au lieu de créer des intervalles qui ne se chevauchent pas, ils permettent de définir la fréquence à laquelle chaque intervalle sera créé.

Par exemple, toutes les 5 minutes, comptez le nombre de véhicules détectés au cours des 30 dernières minutes.

De ce fait, les événements peuvent appartenir à plusieurs intervalles et être comptés autant de fois que nécessaire.

Pour définir une fenêtre glissante, il suffit de transmettre l'intervalle de mise à jour au fenêtre() une fonction.

Voyons le résultat.

Comme nous pouvons le voir, nous avons des fenêtres de 30 minutes créées toutes les 5 minutes.

Cette flexibilité peut être très utile pour définir des règles métier plus spécifiques et des déclencheurs plus complexes. Par exemple, notre détecteur d'embouteillages peut envoyer des réponses toutes les 5 secondes sur les 10 dernières minutes et créer une alerte lorsque la vitesse moyenne de la voiture descend en dessous de 20 km/h.

Il s'agissait d'un aperçu rapide des principaux concepts de Spark Structured Streaming et de la manière dont ils peuvent être appliqués avec Kafka.

Apache Kafka et Apache Spark sont à la fois des outils fiables et robustes utilisés par de nombreuses entreprises pour traiter quotidiennement des quantités incroyables de données, ce qui en fait l'une des paires les plus solides dans la tâche de traitement de flux.

Nous avons appris à remplir, consommer et traiter des sujets Kafka à l'aide de tâches Spark. Ce n'était pas une tâche difficile, comme mentionné dans l'article, l'API de traitement de flux est presque égale à l'API par lots habituelle, avec juste quelques ajustements mineurs.

Nous avons également discuté des différents modes de sortie, de quelque chose de spécifique aux applications de streaming et de la manière dont chacun peut être utilisé. Enfin, nous avons exploré les agrégations avec des fenêtres temporelles, l'une des principales fonctionnalités du traitement de flux.

Encore une fois, il ne s'agissait que d'un aperçu rapide, et je laisserai quelques références ci-dessous si vous souhaitez explorer plus en profondeur.

J'espère avoir aidé d'une manière ou d'une autre, merci d'avoir lu ! 🙂

Tout le code est disponible dans ce GitHub référentiel.
Données utilisées —
Contagènes volumétriques de radars, données ouvertes, gouverneur brésilien.

Présentation approfondie des fonctionnalités : filigrane dans le streaming structuré Apache Spark — Max Fisher sur le blog Databricks
[2] Chambers, B. et Zaharia, M. (2018). Spark : Le guide définitif : Le traitement du Big Data simplifié. «O'Reilly Media, Inc.».
Logistique, expédition et transport en temps réel avec Apache Kafka-Kai Waehner
Avec Apache Kafka dans Netflix Studio et Finance World — Blog Confluent
[5] Spark Streaming et Kafka — https://sparkbyexamples.com/

Un aperçu rapide de Spark Structured Streaming + Kafka republié à partir de la source https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 via https:/ /towardsdatascience.com/feed

<!–

->

Horodatage:

Plus de Consultants en blockchain