Uno sguardo veloce allo streaming strutturato Spark + Kafka

Apprendimento delle nozioni di base su come utilizzare questo potente duo per attività di elaborazione del flusso

Foto di Nikhita Singhal on Unsplash

Recentemente ho iniziato a studiare molto su Apache Kafka e Apache Spark, due tecnologie leader nel mondo dell'ingegneria dei dati.

Ho realizzato diversi progetti utilizzandoli negli ultimi mesi; “Streaming di machine learning con Kafka, Debezium e BentoML" è un esempio. Il mio obiettivo è imparare come creare potenti pipeline di dati con questi famosi strumenti moderni e avere un'idea dei loro vantaggi e svantaggi.

Negli ultimi mesi ho già spiegato come creare pipeline ETL utilizzando entrambi gli strumenti ma senza mai usarli insieme, ed è questa la lacuna che colmerò oggi.

Il nostro obiettivo è apprendere l'idea generale alla base della creazione di un'applicazione di streaming con Spark+Kafka e dare uno sguardo rapido ai suoi concetti principali utilizzando dati reali.

L'idea è semplice: Apache Kafka è uno strumento di streaming di messaggi, in cui i produttori scrivono messaggi all'estremità di una coda (chiamata argomento) per essere letti dai consumatori dall'altro.

Ma è uno strumento molto complesso, costruito per essere un servizio di messaggistica distribuito resiliente, con tutti i tipi di garanzie di consegna (esattamente una volta, una volta, qualsiasi), archiviazione e replica dei messaggi, consentendo allo stesso tempo flessibilità, scalabilità e throughput elevato. Ha una serie più ampia di casi d'uso, come la comunicazione di microservizi, sistemi di eventi in tempo reale e pipeline ETL in streaming.

Apache Spark è un motore di trasformazione dei dati basato sulla memoria distribuita.

È anche uno strumento molto complesso, in grado di connettersi con tutti i tipi di database, file system e infrastruttura cloud. È progettato per operare in ambienti distribuiti per parallelizzare l'elaborazione tra macchine, ottenendo trasformazioni ad alte prestazioni utilizzando la sua filosofia di valutazione pigra e l'ottimizzazione delle query.

La parte interessante è che, alla fine, il codice è solo la solita query SQL o (quasi) il tuo script Python+panda, con tutta la stregoneria astratta sotto una bella API di alto livello user-friendly.

Unisci queste due tecnologie e avremo una combinazione perfetta per costruire una pipeline ETL in streaming.

Utilizzeremo i dati dei sensori di traffico della città di Belo Horizonte (BH), capitale del Minas Gerais (Brasile). Si tratta di un enorme set di dati contenente misurazioni del flusso di traffico in diversi punti della città. Ogni sensore rileva periodicamente il tipo di veicolo che circola in quella posizione (auto, moto, autobus/camion), la sua velocità e la sua lunghezza (e altre informazioni che non utilizzeremo).

Questo set di dati rappresenta proprio una delle applicazioni classiche per i sistemi di streaming: un gruppo di sensori che inviano continuamente le loro letture dal campo.

In questo scenario, Apache Kafka può essere utilizzato come livello di astrazione tra i sensori e le applicazioni che consumano i loro dati.

Kafka utilizzato come livello di astrazione tra fonti e servizi. Immagine dell'autore.

Con questo tipo di infrastrutture è possibile costruire ogni tipo di (cosiddetto) sistemi guidati da eventi in tempo reale, come un programma per rilevare e avvisare degli ingorghi quando il numero di veicoli aumenta improvvisamente con un calo della velocità media.

Ed è qui che entra in gioco Apache Spark.

Ha un modulo nativo per l'elaborazione del flusso chiamato Streaming strutturato Spark, che può connettersi a Kafka ed elaborarne i messaggi.

Configurare l'ambiente

Tutto ciò di cui hai bisogno è docker e docker-compose.

Utilizzeremo una configurazione di file docker-compose basata sui seguenti repository: scintilla di collegamento, collegamento Kafka.

I ./src volume è dove inseriremo i nostri script.

Per avviare l'ambiente, basta eseguire

docker-comporre up

Tutto il codice è disponibile in questo Repository GitHub.

Una delle cose che mi è piaciuta di più quando ho iniziato a studiare Spark è stata la somiglianza tra il codice scritto e i miei soliti script Python+Panda. È stato molto facile migrare.

Seguendo la stessa logica, il modulo di streaming di Spark è molto simile al solito codice Spark, facilitando la migrazione dalle applicazioni batch a quelle stream.

Detto questo, nelle sezioni seguenti ci concentreremo sull'apprendimento delle specificità dello streaming strutturato Spark, ovvero quali nuove funzionalità ha.

Il nostro primo lavoro

Iniziamo lentamente e costruiamo un esempio di giocattolo

La prima cosa da fare è creare un argomento Kafka da cui il nostro lavoro Spark consumerà i messaggi.

Questo è fatto da accedere al terminal container di Kafka ed eseguendo:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Per simulare un produttore che scrive messaggi su questo argomento, utilizziamo il file kafka-console-produttore. Inoltre all'interno del contenitore:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Da ora in poi, ogni riga digitata nel terminale verrà inviata come messaggio all'argomento del test. Il carattere “:” viene utilizzato per separare la chiave e il valore del messaggio (chiave:valore).

Creiamo un processo Spark per utilizzare questo argomento.

Il codice deve essere inserito all'interno del file /src/streaming cartella (niente di speciale, solo la cartella che ho scelto).

La cosa fondamentale da notare è che stiamo utilizzando gli attributi readStream ed writeStream, invece della normale lettura e scrittura. Questo è l'aspetto principale che fa sì che Spark consideri il nostro lavoro come un'applicazione di streaming.

Per connettersi a Kafka è necessario specificare il server e l'argomento. L'opzione StartingOffsets="più presto" dice a Spark di leggere l'argomento dall'inizio. Inoltre, perché Kafka memorizza i suoi messaggi in binario forma, devono essere decodificati stringa.

Le altre opzioni verranno ulteriormente esplorate.

Ora accediamo al contenitore Spark ed eseguiamo il processo.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Dopo alcuni secondi di configurazione, inizierà a consumare l'argomento.

Spark consumando messaggi da Kafka. Immagine dell'autore.

Spark Streaming funziona in micro-batch modalità, ed è per questo che vediamo le informazioni "batch" quando consuma i messaggi.

Il micro-batching è una via di mezzo tra il “vero” streaming completo, in cui tutti i messaggi vengono elaborati individualmente non appena arrivano, e il solito batch, in cui i dati rimangono statici e vengono consumati su richiesta. Spark attenderà un po' di tempo cercando di accumulare messaggi per elaborarli insieme, riducendo il sovraccarico e aumentando la latenza. Questo può essere adattato alle tue esigenze.

Non scrivo molto velocemente, quindi Spark elabora il messaggio prima di poterne includere di nuovi nel batch corrente.

E quello è stato il nostro primo lavoro in streaming!

Spero che tu abbia la sensazione: non è difficile codificare un lavoro di elaborazione del flusso, ma ci sono alcuni trucchi.

Scrittura di dati in un flusso Kafka

Ora è il momento di iniziare a giocare con i dati dei sensori.

È possibile scaricare il chiusura file da AGOSTO 2022 ed estrailo nel file /dati volume. I dati sono originariamente in JSON e occupano circa 23 GB di spazio. La prima cosa da fare è convertirlo in parquet per ottimizzare lo spazio su disco e i tempi di lettura.

Gli spark job per fare questo sono dettagliati nel repository GitHub, tutto quello che devi fare è eseguirli:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

A seconda del computer, l'esecuzione potrebbe richiedere del tempo. Ma paga, la dimensione finale del file parquet è di ~1 Gb (più di 20 volte più piccolo) e molto più veloce da leggere.

Dobbiamo anche creare l'argomento Kafka per ricevere i nostri messaggi:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Facoltativamente, se si desidera visualizzare i messaggi in arrivo, è possibile impostare una console consumer.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Scrivere dati su un argomento Kafka è facile, ma presenta alcuni dettagli.

Nello streaming strutturato, il comportamento predefinito è quello di non provare a dedurre lo schema dei dati (colonne e relativi tipi), quindi dobbiamo passarne uno.

I messaggi Kafka sono solo coppie di stringhe binarie chiave-valore, quindi dobbiamo rappresentare i nostri dati in questo formato. Ciò può essere facilmente ottenuto convertendo tutte le righe in stringhe JSON, codificandole in binario e memorizzando il risultato nella colonna "valore".

Trasformazione di colonne in stringhe JSON. Immagine dell'autore.

Le chiavi dei messaggi sono molto importanti in Kafka, ma non saranno utili nei nostri test, quindi tutti i messaggi avranno le stesse.

Come accennato in precedenza, questo set di dati è ENORME, quindi ho limitato il numero di messaggi inseriti a 500,000.

Infine, passiamo al server e all'argomento Kafka e a "checkpointLocation” dove lo spark memorizzerà l'avanzamento dell'esecuzione, utile per recuperare da errori.

Esecuzione del lavoro:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Inserimento di dati in Kafka. Immagine dell'autore.

A sinistra, il processo Spark legge il file, a destra, a kafka-console-consumatore visualizza i messaggi in arrivo.

Il nostro argomento sul traffico è popolato e quasi pronto per essere elaborato.

È importante ricordare che abbiamo utilizzato uno spark job per popolare il nostro argomento solo a scopo di apprendimento. In uno scenario reale, i sensori stessi invieranno le letture direttamente a Kafka.

Per simulare questo comportamento dinamico, lo script seguente scrive 1 riga nell'argomento ogni 2.5 secondi.

Modalità di output: conteggio del numero di veicoli per tipo

Andando avanti, creiamo un lavoro per contare il numero di veicoli per tipo.

La colonna “Classificação” (Classificazione) contiene il tipo di veicolo rilevato.

Mentre leggiamo l'argomento, dobbiamo riconvertire le stringhe binarie JSON nel formato colonnare.

Fatto ciò, la query può essere costruita come al solito. È interessante notare che il cuore della query è proprio il select().raggruppa per().contare() sequenza, tutto il resto è relativo alla logica dello streaming.

Quindi è il momento di affrontare il outputMode() opzione.

La modalità di output di un'applicazione stream specifica come vogliamo (ri)calcolare e scrivere i risultati quando arrivano nuovi dati.

Può assumere tre diversi valori:

  • Aggiungere: aggiunge solo nuovi record all'output.
  • Completato: Ricalcola il risultato completo per ogni nuovo record.
  • Aggiornanento: aggiorna i record modificati.

Queste modalità possono o meno avere senso a seconda dell'applicazione scritta. Ad esempio, la modalità "completa" potrebbe non avere senso se viene eseguito un raggruppamento o un ordinamento.

Eseguiamo il lavoro in modalità “completa” e guardiamo i risultati.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Camion, Automóvel-Car, Indefinido-Undefinito, Ônibus-Bus, Moto-Motocycle. Immagine dell'autore.

Man mano che vengono inseriti nuovi record nel flusso (vedere il terminale a destra), il lavoro ricalcola il risultato completo. Ciò può essere utile in situazioni in cui l'ordine delle righe è importante, come la classifica o la competizione.

Tuttavia, questo approccio potrebbe non essere ottimale se il numero di gruppi è troppo grande o se i cambiamenti individuali non influiscono sul risultato complessivo.

Quindi, un'altra opzione è utilizzare la modalità di output "aggiornamento", che genera un nuovo messaggio solo per i gruppi che sono cambiati. Vedi sotto:

La query con modalità di output “aggiornamento”. Immagine dell'autore.

La modalità "aggiungi" non è disponibile per le query con raggruppamento, quindi non sarò in grado di mostrare utilizzando lo stesso lavoro. Ma penso che sia la modalità più semplice sempre aggiunge un nuovo record all'output.

Queste modalità di output sono più semplici da comprendere se pensi di salvare i risultati in una tabella. Nella modalità di output completo, la tabella verrà riscritta per ogni nuovo messaggio elaborato, nella modalità di aggiornamento, solo le righe in cui si è verificato qualche aggiornamento, e l'append aggiungerà sempre una nuova riga alla fine.

Finestra temporale a cascata: aggregazione utilizzando intervalli di tempo

Nei sistemi di streaming, i messaggi hanno due timestamp diversi ad essi correlati: Event time: l'ora in cui il messaggio è stato creato, nel nostro caso il tempo di lettura del sensore, e Processing time: quando il messaggio viene letto dall'agente di elaborazione, nel nostro caso quando raggiunge Spark.

Una caratteristica importante degli strumenti di elaborazione del flusso è la capacità di gestire l'elaborazione del tempo degli eventi. Le finestre a cascata sono intervalli di tempo fissi non sovrapposti utilizzati per creare aggregazioni utilizzando colonne di tempo evento. Per dirla più semplicemente, dividono la sequenza temporale in sezioni di uguali dimensioni in modo che ogni evento appartenga a un singolo intervallo.

Ad esempio, conta, ogni 5 minuti, quanti veicoli sono stati rilevati negli ultimi 5 minuti.

Finestra rotante di 5 minuti. Immagine dell'autore.

Il codice seguente illustra questo:

Questo tipo di elaborazione può essere estremamente utile in molte situazioni. Tornando al rilevatore di ingorghi proposto in precedenza, un possibile approccio è quello di misurare la velocità media dei veicoli in una finestra di 10 minuti e vedere se è al di sotto di una certa soglia.

L'elaborazione del tempo degli eventi è un argomento complesso. Quando si affronta questo problema può succedere di tutto, come la perdita di messaggi, l'arrivo troppo tardi o il disordine. Spark ha diversi meccanismi per cercare di mitigare i problemi, come filigrane, su cui non ci soffermeremo.

Le finestre temporali possono essere utilizzate anche insieme ad altre colonne nel file raggruppa per(). L'esempio seguente conta il numero di veicoli per tipologia in una finestra di 5 minuti.

Finestra temporale scorrevole: flessibilizzazione degli intervalli temporali

Le finestre temporali scorrevoli sono una flessibilizzazione delle finestre cadenti. Invece di creare intervalli non sovrapposti, consentono di definire la frequenza con cui verrà creato ciascun intervallo.

Ad esempio, ogni 5 minuti conta quanti veicoli sono stati rilevati negli ultimi 30 minuti.

Per questo motivo, gli eventi possono appartenere a più intervalli ed essere contati tutte le volte necessarie.

Per definire una finestra scorrevole basta passare l'intervallo di aggiornamento al file finestra() funzione.

Vediamo l'output.

Come possiamo vedere, vengono create finestre di 30 minuti ogni 5 minuti.

Questa flessibilità può essere molto utile per definire regole aziendali più specifiche e trigger più complessi. Ad esempio, il nostro rilevatore di ingorghi può inviare risposte ogni 5 secondi sugli ultimi 10 minuti e creare un avviso quando la velocità media dell'auto scende al di sotto dei 20 km/h.

Questa è stata una rapida occhiata ai concetti principali di Spark Structured Streaming e al modo in cui possono essere applicati con Kafka.

Apache Kafka e Apache Spark sono strumenti affidabili e robusti utilizzati da molte aziende per elaborare quotidianamente quantità incredibili di dati, rendendoli una delle coppie più forti nell'attività di elaborazione dei flussi.

Abbiamo imparato come popolare, utilizzare ed elaborare gli argomenti Kafka utilizzando i processi Spark. Non è stato un compito difficile, come menzionato nel post, l'API di elaborazione del flusso è quasi uguale alla solita API batch, con solo alcune piccole modifiche.

Abbiamo anche discusso delle diverse modalità di output, di qualcosa di specifico per le applicazioni di streaming e di come ciascuna di esse può essere utilizzata. Ultimo ma non meno importante, abbiamo esplorato le aggregazioni con finestre temporali, una delle principali funzionalità dell'elaborazione del flusso.

Ancora una volta, questa è stata solo una rapida occhiata e lascerò alcuni riferimenti di seguito se desideri esplorare più a fondo.

Spero di averti aiutato in qualche modo, grazie per aver letto! 🙂

Tutto il codice è disponibile in questo Repository GitHub.
Dati utilizzati —
Contagens Volumétricas de Radares, Open Data, governatore brasiliano

, Approfondimento sulle funzionalità: filigrana nello streaming strutturato Apache Spark — Max Fisher sul blog di Databricks
[2] Chambers, B. e Zaharia, M. (2018). Spark: la guida definitiva: l'elaborazione dei Big Data resa semplice. “O'Reilly Media, Inc.”.
, Logistica, spedizione e trasporti in tempo reale con Apache Kafka— Kai Waehner
, Con Apache Kafka in Netflix Studio e Finance World — Blog confluente
[5] Spark Streaming e Kafka — https://sparkbyexamples.com/

Uno sguardo veloce a Spark Structured Streaming + Kafka ripubblicato dalla fonte https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 tramite https:/ /towardsdatascience.com/feed

<!–

->

Timestamp:

Di più da Consulenti Blockchain