En snabb titt på Spark Structured Streaming + Kafka

Lär dig grunderna i hur du använder denna kraftfulla duo för strömbearbetningsuppgifter

Foto: Nikhita Singhal on Unsplash

Nyligen började jag studera mycket om Apache Kafka och Apache Spark, två ledande teknologier inom datateknikvärlden.

Jag har gjort flera projekt med hjälp av dem under de senaste månaderna; "Maskininlärning Streaming med Kafka, Debezium och BentoML” är ett exempel. Mitt fokus är att lära mig hur man skapar kraftfulla datapipelines med dessa moderna kända verktyg och få en känsla av deras fördelar och nackdelar.

Under de senaste månaderna har jag redan täckt hur man skapar ETL-pipelines med båda verktygen men aldrig använda dem tillsammans, och det är det tomrum jag kommer att fylla idag.

Vårt mål är att lära sig den allmänna idén bakom att bygga en streamingapplikation med Spark+Kafka och ge en snabb titt på dess huvudkoncept med hjälp av verklig data.

Tanken är enkel — Apache Kafka är ett verktyg för meddelandeströmning, där producenter skriver meddelanden i ena änden av en kö (kallas en ämne) att läsas av konsumenterna å andra sidan.

Men det är ett mycket komplext verktyg, byggt för att vara en motståndskraftig distribuerad meddelandetjänst, med alla möjliga leveransgarantier (exakt en gång, en gång, vilken som helst), meddelandelagring och meddelandereplikering, samtidigt som det tillåter flexibilitet, skalbarhet och hög genomströmning. Den har en bredare uppsättning användningsfall, som kommunikation med mikrotjänster, händelsesystem i realtid och strömmande ETL-pipelines.

Apache Spark är en distribuerad minnesbaserad datatransformationsmotor.

Det är också ett mycket komplext verktyg, som kan ansluta till alla typer av databaser, filsystem och molninfrastruktur. Den är inriktad på att arbeta i distribuerade miljöer för att parallellisera bearbetning mellan maskiner och uppnå högpresterande transformationer genom att använda sin lata utvärderingsfilosofi och frågeoptimeringar.

Det coola med det är att koden i slutet av dagen bara är din vanliga SQL-fråga eller (nästan) ditt Python+panda-skript, med all häxkonst abstraherad under ett trevligt användarvänligt högnivå-API.

Gå med i dessa två tekniker och vi har en perfekt match för att bygga en ETL-pipeline för streaming.

Vi kommer att använda data från trafiksensorer i staden Belo Horizonte (BH), huvudstaden i Minas Gerais (Brasilien). Det är en enorm datauppsättning som innehåller mätningar av trafikflödet på flera platser i staden. Varje sensor upptäcker periodiskt vilken typ av fordon som kör på den platsen (bil, motorcykel, buss/lastbil), dess hastighet och längd (och annan information som vi inte kommer att använda).

Denna datauppsättning representerar just en av de klassiska applikationerna för streamingsystem - en grupp sensorer som skickar sina avläsningar kontinuerligt från fältet.

I det här scenariot kan Apache Kafka användas som ett abstraktionslager mellan sensorerna och applikationerna som förbrukar deras data.

Kafka användes som ett abstraktionslager mellan källor och tjänster. Bild av författare.

Med den här typen av infrastruktur är det möjligt att bygga alla möjliga (de så kallade) händelsedrivna system i realtid, som ett program för att upptäcka och varna för trafikstockningar när antalet fordon plötsligt ökar med en minskning av medelhastigheten.

Och det är där Apache Spark kommer in i bilden.

Den har en inbyggd modul för strömbehandling som kallas Spark Structured Streaming, som kan ansluta till Kafka och bearbeta dess meddelanden.

Ställa in miljön

Allt du behöver är docker och docker-compose.

Vi kommer att använda en docker-compose-filkonfiguration baserad på följande förråd: länk gnista, länk kafka.

Smakämnen ./src volymen är där vi ska lägga våra manus.

För att starta miljön är det bara att springa

docker-komponera upp

All kod finns tillgänglig i denna GitHub repository.

En av de saker som jag gillade mest när jag började studera Spark var likheten mellan skriven kod för den och mina vanliga python+panda-skript. Det var väldigt lätt att migrera.

Enligt samma logik är Sparks strömningsmodul mycket lik den vanliga gnistkoden, vilket gör det enkelt att migrera från batchapplikationerna till strömningsapplikationerna.

Med det sagt, i de följande avsnitten kommer vi att fokusera på att lära oss de specifika egenskaperna hos Spark-strukturerad streaming, dvs vilka nya funktioner den har.

Vårt första jobb

Låt oss börja långsamt och bygga ett leksaksexempel

Det första du ska göra är att skapa ett Kafka-ämne där vårt sparkjobb kommer att förbruka meddelandena.

Detta görs av åtkomst till Kafkas containerterminal och kör:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

För att simulera en producent som skriver meddelanden om detta ämne, låt oss använda kafka-konsol-producent. Även inuti behållaren:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Från och med nu kommer varje rad som skrivs i terminalen att skickas som ett meddelande till testämnet. Tecknet ":" används för att separera meddelandets nyckel och värde (nyckel:värde).

Låt oss skapa ett Spark-jobb för att konsumera detta ämne.

Koden måste läggas in i /src/streaming mapp (inget speciellt, bara mappen som jag valde).

Det viktigaste att notera är att vi använder attributen läsström och skrivström, istället för att läsa och skriva normalt. Detta är huvudaspekten som gör att Spark behandlar vårt jobb som en streamingapplikation.

För att ansluta till Kafka är det nödvändigt att ange servern och ämnet. Alternativet startOffsets=“tidigast” säger till Spark att läsa ämnet från början. Dessutom för att Kafka lagrar sina meddelanden i binär form måste de avkodas till sträng.

De andra alternativen kommer att undersökas ytterligare.

Låt oss nu komma åt Spark-behållaren och köra jobbet.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Efter några sekunders konfiguration kommer det att börja konsumera ämnet.

Gnistförtärande meddelanden från Kafka. Bild av författare.

Spark Streaming fungerar i mikro-batching läge, och det är därför vi ser "batch"-informationen när den förbrukar meddelandena.

Micro-batching är något mellan full "true" streaming, där alla meddelanden behandlas individuellt när de anländer, och den vanliga batchen, där data förblir statisk och konsumeras på begäran. Spark kommer att vänta ett tag med att försöka samla meddelanden för att bearbeta dem tillsammans, vilket minskar overhead och ökar latensen. Detta kan anpassas efter dina behov.

Jag är ingen supersnabb typ, så Spark bearbetar meddelandet innan jag kan inkludera nya i den aktuella batchen.

Och det var vårt första streamingjobb!

Jag hoppas att du får känslan: det är inte svårt att koda ett strömbearbetningsjobb, men det finns några gotchas.

Att skriva data till en Kafka-ström

Nu är det dags att börja leka med sensordata.

Du kan ladda ner zip fil från AUGUSTI 2022 och extrahera den i / data volym. Data är ursprungligen i JSON och tar cirka 23 Gb utrymme. Det första du ska göra är att konvertera den till parkett för att optimera diskutrymme och lästid.

Gnistjobben för att göra detta är detaljerade i GitHub-förvaret, allt du behöver göra är att köra dem:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Beroende på din maskin kan körningen ta lite tid. Men det lönar sig, den slutliga parkettfilstorleken är ~1Gb (mer än 20x mindre) och mycket snabbare att läsa.

Vi måste också skapa Kafka-ämnet för att ta emot våra meddelanden:

kafka-topics.sh --create --replikeringsfaktor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Om du vill visa de inkommande meddelanden är det möjligt att konfigurera en konsolkonsument.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Det är enkelt att skriva data om ett Kafka-ämne, men det har vissa detaljer.

I strukturerad streaming är standardbeteendet att inte försöka härleda dataschemat (kolumner och deras typer), så vi måste skicka ett.

Kafka-meddelanden är bara nyckel-värde binära strängpar, så vi måste representera vår data i detta format. Detta kan enkelt uppnås genom att konvertera alla rader till JSON-strängar, koda dem i binärt och lagra resultatet i kolumnen "värde".

Omvandla kolumner till JSON-strängar. Bild av författare.

Meddelandenycklar är mycket viktiga i Kafka, men de kommer inte att vara användbara i våra tester, så alla meddelanden kommer att ha samma.

Som nämnts tidigare är denna datauppsättning ENORM, så jag begränsade antalet infogade meddelanden till 500,000 XNUMX.

Slutligen passerar vi Kafka-servern och ämnet och en "checkpointLocation” där gnistan kommer att lagra exekveringsförloppet, användbart för att återställa från fel.

Utför jobbet:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Infogar data i Kafka. Bild av författare.

Till vänster läser Spark-jobbet filen, till höger en kafka-konsol-konsument visar de inkommande meddelandena.

Vårt trafikämne är ifyllt och nästan redo att behandlas.

Det är viktigt att komma ihåg att vi använde ett sparkjobb för att fylla i vårt ämne enbart i lärande syfte. I ett verkligt scenario kommer sensorerna själva att skicka avläsningar direkt till Kafka.

För att simulera detta dynamiska beteende skriver skriptet nedan 1 rad till ämnet var 2.5 sekund.

Utgångslägen — Räknar antalet fordon efter typ

Gå vidare, låt oss skapa ett jobb för att räkna antalet fordon efter typ.

Kolumnen "Classificação" (Klassificering) innehåller den upptäckta fordonstypen.

När vi läser från ämnet måste vi konvertera de binära JSON-strängarna tillbaka till kolumnformatet.

När detta är gjort kan frågan byggas som vanligt. Det är intressant att notera att frågans hjärta bara är välj().Grupp av().räkna() sekvens, resten är i förhållande till strömningslogik.

Så det är dags att ta itu med outputMode() alternativ.

Utdataläget för en strömapplikation anger hur vi vill (om)beräkna och skriva resultaten när ny data kommer in.

Den kan anta tre olika värden:

  • Bifoga: Lägg bara till nya poster i utgången.
  • Komplett: Beräkna om det fullständiga resultatet för varje nytt rekord.
  • Uppdatering: Uppdatera ändrade poster.

Dessa lägen kan eller kan inte vara vettiga beroende på applikationen som skrivits. Till exempel kanske läget "komplett" inte är meningsfullt om någon gruppering eller sortering utförs.

Låt oss utföra jobbet i "komplett" läge och titta på resultaten.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Lastbil, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Bild av författare.

När nya poster infogas i strömmen (se terminalen till höger), beräknar jobbet om hela resultatet. Detta kan vara användbart i situationer där radordning är viktig, som rankning eller tävling.

Men detta tillvägagångssätt kanske inte är optimalt om antalet grupper är för stort eller om de individuella förändringarna inte påverkar det totala resultatet.

Så ett annat alternativ är att använda utgångsläget "uppdatering", som genererar ett nytt meddelande endast för de grupper som har ändrats. Se nedan:

Frågan med utdataläge "uppdatering". Bild av författare.

Läget "lägg till" är inte tillgängligt för frågor med gruppering, så jag kommer inte att kunna visa med samma jobb. Men jag tror att det är det enklaste läget, det alltid lägger till en ny post i utgången.

Dessa utdatalägen är enklare att förstå om du funderar på att spara resultaten i en tabell. I det fullständiga utdataläget kommer tabellen att skrivas om för varje nytt meddelande som bearbetas, i uppdateringsläget, bara de rader där någon uppdatering inträffade, och tillägget kommer alltid att lägga till en ny rad i slutet.

Tumbling tidsfönster — Aggregering med tidsintervall

I streamingsystem har meddelanden två olika tidsstämplar relaterade till dem: Händelsetid — Tiden då meddelandet skapades, i vårt fall sensorns avläsningstid, och Behandlingstid — När meddelandet läses av bearbetningsagenten, i vårt fall då den når Spark.

En viktig egenskap hos strömbearbetningsverktyg är förmågan att hantera händelsetidsbearbetning. Tumbling windows är icke-överlappande fasta tidsintervall som används för att göra aggregering med hjälp av händelsetidskolumner. För att uttrycka det enklare delar de upp tidslinjen i lika stora delar så att varje händelse tillhör ett enda intervall.

Till exempel, räkna, var 5:e minut, hur många fordon som upptäckts under de senaste 5 minuterna.

5 min tumlande fönster. Bild av författare.

Koden nedan illustrerar detta:

Denna typ av bearbetning kan vara extremt användbar i många situationer. För att gå tillbaka till den tidigare föreslagna trafikstockningsdetektorn, är ett möjligt tillvägagångssätt att mäta fordonens medelhastighet i ett 10-minutersfönster och se om den ligger under en viss tröskel.

Händelse-tidsbearbetning är ett komplext ämne. Allt kan hända när man hanterar det, som att meddelanden går förlorade, kommer för sent eller går ur funktion. Spark har flera mekanismer för att försöka mildra problemen, som vattenmärken, som vi inte kommer att fokusera på.

Tidsfönster kan också användas tillsammans med andra kolumner i Grupp av(). Exemplet nedan räknar antalet fordon per typ i ett 5min fönster.

Glidande tidsfönster — Flexibilisering av tidsintervallen

Glidande tidsfönster är en flexibilisering av tumlande fönster. Istället för att skapa icke-överlappande intervall, tillåter de att definiera hur ofta varje intervall ska skapas.

Till exempel, var 5:e minut, räkna hur många fordon som upptäckts under de senaste 30 minuterna.

På grund av det kan händelser tillhöra många intervaller och räknas så många gånger som behövs.

För att definiera ett glidande fönster, skicka bara uppdateringsintervallet till fönster() funktion.

Låt oss se resultatet.

Som vi kan se har vi 30 min fönster som skapas varje 5 min.

Denna flexibilitet kan vara mycket användbar för att definiera mer specifika affärsregler och mer komplexa triggers. Till exempel kan vår trafikstockningsdetektor skicka svar var 5:e sekund under de senaste 10 minuterna och skapa en varning när den genomsnittliga bilhastigheten sjunker under 20 km/h.

Detta var en snabb titt på huvudkoncepten för Spark Structured Streaming och hur de kan tillämpas med Kafka.

Apache Kafka och Apache Spark är både pålitliga och robusta verktyg som används av många företag för att dagligen bearbeta otroliga mängder data, vilket gör dem till ett av de starkaste paren i strömbearbetningsuppgiften.

Vi har lärt oss hur man fyller i, konsumerar och bearbetar Kafka-ämnen med hjälp av Spark-jobb. Detta var ingen svår uppgift, som nämnts i inlägget, strömbehandlings-API:et är nästan lika med det vanliga batch-API:et, med bara några mindre justeringar.

Vi har också diskuterat olika utmatningslägen, något specifikt för streamapplikationer och hur var och en kan användas. Sist men inte minst utforskade vi aggregering med tidsfönster, en av huvudfunktionerna för strömbehandling.

Återigen, detta var bara en snabb titt, och jag lämnar några referenser nedan om du vill utforska djupare.

Hoppas jag har hjälpt till på något sätt, tack för att du läser! 🙂

All kod finns tillgänglig i denna GitHub repository.
Data som används —
Contagens Volumétricas de Radares, öppen data, brasiliansk regering

[1] Feature Deep Dive: Vattenmärkning i Apache Spark Structured Streaming — Max Fisher på Databricks blogg
[2] Chambers, B., & Zaharia, M. (2018). Spark: Den definitiva guiden: Bearbetning av stora data på ett enkelt sätt. "O'Reilly Media, Inc.".
[3] Logistik, frakt och transport i realtid med Apache Kafka– Kai Waehner
[4] Med Apache Kafka i Netflix Studio och Finance World — Konfluent blogg
[5] Spark Streaming & Kafka — https://sparkbyexamples.com/

En snabb titt på Spark Structured Streaming + Kafka återpublicerad från källa https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 via https:/ /towardsdatascience.com/feed

<!–

->

Tidsstämpel:

Mer från Blockchain-konsulter