En rask titt på Spark Structured Streaming + Kafka

Lær det grunnleggende om hvordan du bruker denne kraftige duoen til strømbehandlingsoppgaver

Photo by Nikhita Singhal on Unsplash

Nylig begynte jeg å studere mye om Apache Kafka og Apache Spark, to ledende teknologier innen datateknikkverdenen.

Jeg har laget flere prosjekter med dem de siste månedene; "Maskinlæring Streaming med Kafka, Debezium og BentoML" er et eksempel. Mitt fokus er å lære å lage kraftige datapipelines med disse moderne kjente verktøyene og få en følelse av fordeler og ulemper.

I løpet av de siste månedene har jeg allerede dekket hvordan man lager ETL-rørledninger ved å bruke begge verktøyene, men aldri bruke dem sammen, og det er gapet jeg skal fylle i dag.

Målet vårt er å lære den generelle ideen bak å bygge en strømmeapplikasjon med Spark+Kafka og gi en rask titt på hovedkonseptene ved hjelp av ekte data.

Ideen er enkel - Apache Kafka er et verktøy for meldingsstrømming, der produsenter skriver meldinger i den ene enden av en kø (kalt en Tema) for å bli lest av forbrukere på den andre.

Men det er et veldig komplekst verktøy, bygget for å være en spenstig distribuert meldingstjeneste, med alle slags leveringsgarantier (nøyaktig én gang, én gang, hvilken som helst), meldingslagring og meldingsreplikering, samtidig som den tillater fleksibilitet, skalerbarhet og høy gjennomstrømning. Den har et bredere sett med brukstilfeller, som kommunikasjon med mikrotjenester, hendelsessystemer i sanntid og streaming av ETL-rørledninger.

Apache Spark er en distribuert minnebasert datatransformasjonsmotor.

Det er også et veldig komplekst verktøy som kan kobles til alle slags databaser, filsystemer og skyinfrastruktur. Den er rettet til å operere i distribuerte miljøer for å parallellisere prosessering mellom maskiner, og oppnå høyytelsestransformasjoner ved å bruke sin late evalueringsfilosofi og spørringsoptimaliseringer.

Den kule delen med det er at ved slutten av dagen er koden bare din vanlige SQL-spørring eller (nesten) Python+pandas-skriptet ditt, med alt hekseri abstrahert under en fin brukervennlig API på høyt nivå.

Bli med disse to teknologiene og vi har en perfekt match for å bygge en streaming ETL-pipeline.

Vi vil bruke dataene fra trafikksensorer i byen Belo Horizonte (BH), hovedstaden i Minas Gerais (Brasil). Det er et enormt datasett som inneholder målinger av trafikkflyt flere steder i byen. Hver sensor oppdager med jevne mellomrom typen kjøretøy som kjører på det stedet (bil, motorsykkel, buss/lastebil), hastighet og lengde (og annen informasjon som vi ikke kommer til å bruke).

Dette datasettet representerer nettopp en av de klassiske applikasjonene for strømmesystemer - en gruppe sensorer som sender avlesningene sine kontinuerlig fra feltet.

I dette scenariet kan Apache Kafka brukes som et abstraksjonslag mellom sensorene og applikasjonene som bruker dataene deres.

Kafka brukt som et abstraksjonslag mellom kilder og tjenester. Bilde av forfatter.

Med denne typen infrastruktur er det mulig å bygge alle slags (det såkalte) hendelsesdrevne systemer i sanntid, som et program for å oppdage og varsle om trafikkork når antall kjøretøy plutselig øker med et fall i gjennomsnittshastigheten.

Og det er her Apache Spark kommer inn i bildet.

Den har en innebygd modul for strømbehandling kalt Spark Structured Streaming, som kan koble til Kafka og behandle meldingene.

Sette opp miljøet

Alt du trenger er docker og docker-compose.

Vi bruker en docker-compose-filkonfigurasjon basert på følgende depoter: link gnist, lenke kafka.

De ./src volum er der vi skal legge skriptene våre.

For å starte miljøet, bare løp

docker-komponere opp

All koden er tilgjengelig i denne GitHub repository.

En av tingene jeg likte best når jeg begynte å studere Spark, var likheten mellom skrevet kode for det og mine vanlige python+pandas-skript. Det var veldig enkelt å migrere.

Etter samme logikk er Sparks strømmemodul veldig lik den vanlige gnistkoden, noe som gjør det enkelt å migrere fra batchapplikasjonene til strømme.

Med det sagt, vil vi i de følgende delene fokusere på å lære spesifisitetene til Spark-strukturert streaming, dvs. hvilke nye funksjoner den har.

Vår første jobb

La oss starte sakte og bygge et lekeeksempel

Den første tingen å gjøre er å lage et Kafka-emne der gnistjobben vår vil konsumere meldingene.

Dette gjøres av tilgang til Kafka containerterminal og utfører:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

For å simulere en produsent som skriver meldinger om dette emnet, la oss bruke kafka-konsoll-produsent. Også inne i beholderen:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Fra nå av vil hver linje som skrives i terminalen bli sendt som en melding til testemnet. Tegnet ":" brukes til å skille meldingens nøkkel og verdi (nøkkel:verdi).

La oss lage en Spark-jobb for å konsumere dette emnet.

Koden må legges inn i /src/streaming mappe (ikke noe spesielt, bare mappen jeg valgte).

Det viktigste å merke seg er at vi bruker attributtene readStream og writeStream, i stedet for vanlig lesing og skriving. Dette er hovedaspektet som gjør at Spark behandler jobben vår som en strømmeapplikasjon.

For å koble til Kafka er det nødvendig å spesifisere serveren og emnet. Valget starterOffsets=“tidligst» forteller Spark å lese emnet fra begynnelsen. Også fordi Kafka lagrer meldingene sine i binære form, må de dekodes til string.

De andre alternativene vil bli nærmere utforsket.

La oss nå få tilgang til Spark-beholderen og kjøre jobben.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Etter noen sekunder med konfigurasjon vil den begynne å konsumere emnet.

Gnistoppslukende meldinger fra Kafka. Bilde av forfatter.

Spark Streaming fungerer i mikro-batching modus, og det er derfor vi ser "batch"-informasjonen når den bruker meldingene.

Micro-batching er noe mellom full "ekte" streaming, der alle meldingene behandles individuelt når de ankommer, og den vanlige batchen, hvor dataene forblir statiske og forbrukes på forespørsel. Spark vil vente litt med å prøve å samle meldinger for å behandle dem sammen, redusere overhead og øke ventetiden. Dette kan tilpasses dine behov.

Jeg er ikke en superrask type, så Spark behandler meldingen før jeg kan inkludere nye i gjeldende batch.

Og det var vår første strømmejobb!

Jeg håper du får følelsen: det er ikke vanskelig å kode en strømbehandlingsjobb, men det er noen gotchas.

Skrive data til en Kafka-strøm

Nå er det på tide å begynne å leke med sensordataene.

Du kan laste ned zip fil fra AUGUST 2022 og pakk den ut i / data volum. Dataene er opprinnelig i JSON og tar rundt 23 Gb plass. Det første du må gjøre er å konvertere den til parkett for å optimalisere diskplass og lesetid.

Gnistjobbene for å gjøre dette er detaljert i GitHub-depotet, alt du trenger å gjøre er å utføre dem:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Avhengig av maskinen din, kan utførelsen ta litt tid. Men det lønner seg, den endelige parkettfilstørrelsen er ~1Gb (mer enn 20x mindre) og mye raskere å lese.

Vi må også lage Kafka-emnet for å motta meldingene våre:

kafka-topics.sh --create --replikeringsfaktor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Eventuelt, hvis du ønsker å vise de ankommende meldingene, er det mulig å sette opp en konsollforbruker.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Å skrive data om et Kafka-emne er enkelt, men har noen detaljer.

I strukturert strømming er standard oppførsel å ikke prøve å utlede dataskjemaet (kolonner og deres typer), så vi må sende en.

Kafka-meldinger er bare nøkkelverdi-binære strengpar, så vi må representere dataene våre i dette formatet. Dette kan enkelt oppnås ved å konvertere alle rader til JSON-strenger, kode dem i binær, og lagre resultatet i "verdi"-kolonnen.

Transformere kolonner til JSON-strenger. Bilde av forfatter.

Meldingsnøkler er veldig viktige i Kafka, men de vil ikke være nyttige i testene våre, så alle meldinger vil ha det samme.

Som nevnt før, er dette datasettet STORT, så jeg begrenset antall meldinger som ble satt inn til 500,000 XNUMX.

Til slutt passerer vi Kafka-serveren og emnet og en "sjekkpunktPlassering” hvor gnisten vil lagre utførelsesfremdriften, nyttig for å gjenopprette fra feil.

Utførelse av jobben:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Setter inn data i Kafka. Bilde av forfatter.

Til venstre leser Spark-jobben filen, til høyre, en kafka-konsoll-forbruker viser de ankommende meldingene.

Vårt trafikkemne er fylt ut og nesten klart til å behandles.

Det er viktig å huske at vi brukte en gnistjobb for å fylle ut emnet vårt bare for læringsformål. I et reelt scenario vil sensorene selv sende avlesninger direkte til Kafka.

For å simulere denne dynamiske oppførselen, skriver skriptet nedenfor 1 rad til emnet hvert 2.5 sekund.

Utgangsmoduser — Teller antall kjøretøy etter type

Gå videre, la oss lage en jobb for å telle antall kjøretøy etter type.

Kolonnen "Classificação" (klassifisering) inneholder kjøretøytypen som er oppdaget.

Mens vi leser fra emnet, må vi konvertere de binære JSON-strengene tilbake til kolonneformatet.

Når dette er gjort, kan spørringen bygges som vanlig. Det er interessant å merke seg at søkehjertet bare er velg🇧🇷gruppe av🇧🇷telle() sekvens, resten er i forhold til strømmelogikk.

Så det er på tide å ta opp utgangsmodus() alternativet.

Utdatamodusen til en strømapplikasjon spesifiserer hvordan vi ønsker å (om)beregne og skrive resultatene etter hvert som nye data kommer.

Den kan anta tre forskjellige verdier:

  • Tilføy: Legg bare til nye poster i utdataene.
  • Komplett: Beregn hele resultatet på nytt for hver ny rekord.
  • Oppdater: Oppdater endrede poster.

Disse modusene kan eller kan ikke gi mening avhengig av søknaden som er skrevet. For eksempel kan det hende at "fullstendig"-modus ikke gir mening hvis noen gruppering eller sortering utføres.

La oss utføre jobben i "fullstendig" modus og se på resultatene.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Lastebil, Automóvel-Car, Indefinido-Undefined, Ônibus-Buss, Moto-Motocycle. Bilde av forfatter.

Etter hvert som nye poster legges inn i strømmen (se terminalen til høyre), beregner jobben hele resultatet på nytt. Dette kan være nyttig i situasjoner der rekkefølge er viktig, som rangering eller konkurranse.

Imidlertid kan denne tilnærmingen ikke være optimal hvis antallet grupper er for stort eller de individuelle endringene ikke påvirker det samlede resultatet.

Så et annet alternativ er å bruke "oppdatering"-utgangsmodusen, som genererer en ny melding bare for gruppene som har endret seg. Se nedenfor:

Spørringen med utdatamodus "oppdatering". Bilde av forfatter.

"Legg til"-modus er ikke tilgjengelig for spørringer med gruppering, så jeg vil ikke kunne vise med samme jobb. Men jeg tror at det er den enkleste modusen, det alltid legger til en ny post i utgangen.

Disse utdatamodusene er enklere å forstå hvis du tenker på å lagre resultatene i en tabell. I fullstendig utdatamodus vil tabellen skrives om for hver ny melding som behandles, i oppdateringsmodus, bare linjene der en oppdatering skjedde, og vedlegget vil alltid legge til en ny linje på slutten.

Tumbling tidsvindu — Aggregering ved hjelp av tidsintervaller

I strømmesystemer har meldinger to forskjellige tidsstempler knyttet til seg: Hendelsestid — Tiden da meldingen ble opprettet, i vårt tilfelle sensorens lesetid, og Behandlingstid — Når meldingen leses av behandlingsagenten, i vårt tilfelle når den når Spark.

En viktig funksjon ved strømbehandlingsverktøy er muligheten til å håndtere hendelsestidsbehandling. Tumbling-vinduer er ikke-overlappende faste tidsintervaller som brukes til å lage aggregeringer ved bruk av hendelsestidskolonner. For å si det enklere deler de opp tidslinjen i like store stykker slik at hver hendelse tilhører et enkelt intervall.

For eksempel, tell, hvert 5. minutt, hvor mange kjøretøy som ble oppdaget de siste 5 minuttene.

5 min veltevindu. Bilde av forfatter.

Koden nedenfor illustrerer dette:

Denne typen behandling kan være svært nyttig i mange situasjoner. Går tilbake til trafikkorkdetektoren som ble foreslått tidligere, er en mulig tilnærming å måle kjøretøyenes gjennomsnittshastighet i et 10-minutters vindu og se om den er under en viss terskel.

Hendelsestidsbehandling er et komplekst tema. Alt kan skje når du håndterer det, som at meldinger går tapt, kommer for sent eller går ut av drift. Spark har flere mekanismer for å prøve å dempe problemene, som vannmerker, som vi ikke vil fokusere på.

Tidsvinduer kan også brukes sammen med andre kolonner i gruppe av(). Eksemplet nedenfor teller antall kjøretøy etter type i et 5min vindu.

Glidende tidsvindu — Fleksibilisering av tidsintervallene

Skyvetidsvinduer er en fleksibilisering av rullevinduer. I stedet for å lage ikke-overlappende intervaller, tillater de å definere hvor ofte hvert intervall skal opprettes.

For eksempel, hvert 5. minutt, tell hvor mange kjøretøy som ble oppdaget i løpet av de siste 30 minuttene.

På grunn av det kan hendelser tilhøre mange intervaller og telles så mange ganger som nødvendig.

For å definere et skyvevindu, send bare oppdateringsintervallet til vindu() funksjon.

La oss se resultatet.

Som vi kan se, har vi 30 minutter vinduer som opprettes hvert 5. minutt.

Denne fleksibiliteten kan være ganske nyttig for å definere mer spesifikke forretningsregler og mer komplekse utløsere. For eksempel kan trafikkorkdetektoren vår sende svar hvert 5. sekund de siste 10 minuttene og lage et varsel når gjennomsnittlig bilhastighet faller under 20 km/t.

Dette var en rask titt på hovedkonseptene til Spark Structured Streaming og hvordan de kan brukes med Kafka.

Apache Kafka og Apache Spark er både pålitelige og robuste verktøy som brukes av mange selskaper til daglig å behandle utrolige mengder data, noe som gjør dem til et av de sterkeste parene i strømbehandlingsoppgaven.

Vi har lært hvordan du fyller ut, konsumerer og behandler Kafka-emner ved å bruke Spark-jobber. Dette var ingen vanskelig oppgave, som nevnt i innlegget, strømbehandlings-APIet er nesten likt det vanlige batch-API, med bare noen mindre justeringer.

Vi har også diskutert forskjellige utgangsmoduser, noe spesifikt for strømmeapplikasjoner, og hvordan hver enkelt kan brukes. Sist, men ikke minst, utforsket vi aggregeringer med tidsvinduer, en av hovedfunksjonene til strømbehandling.

Igjen, dette var bare et raskt blikk, og jeg vil legge igjen noen referanser nedenfor hvis du vil utforske dypere.

Håper jeg har hjulpet på en eller annen måte, takk for at du leser! 🙂

All koden er tilgjengelig i denne GitHub repository.
Data brukt -
Contagens Volumétricas de Radares, Åpne data, brasiliansk guvernør

[1] Feature Deep Dive: Vannmerking i Apache Spark Structured Streaming — Max Fisher på Databricks-bloggen
[2] Chambers, B., & Zaharia, M. (2018). Spark: Den definitive guiden: Behandling av store data på en enkel måte. "O'Reilly Media, Inc.".
[3] Sanntidslogistikk, frakt og transport med Apache Kafka– Kai Waehner
[4] Med Apache Kafka i Netflix Studio og Finance World — Sammenflytende blogg
[5] Spark Streaming og Kafka — https://sparkbyexamples.com/

En rask titt på Spark Structured Streaming + Kafka publisert på nytt fra kilde https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 via https:/ /towardsdatascience.com/feed

<!–

->

Tidstempel:

Mer fra Blockchain-konsulenter