Kiire pilk Spark Structured Streaming + Kafkale

Õppige põhitõdesid, kuidas seda võimsat duot vootöötlustoimingute jaoks kasutada

Foto: Nikhita Singhal on Unsplash

Hiljuti hakkasin palju õppima Apache Kafka ja Apache Sparki, kahe andmetehnoloogia maailma juhtiva tehnoloogia kohta.

Olen neid kasutades viimastel kuudel teinud mitmeid projekte; “Masinõppe voogesitus Kafka, Debeziumi ja BentoML-iga” on näide. Minu eesmärk on õppida, kuidas luua võimsaid andmekonveiereid nende kaasaegsete kuulsate tööriistadega ning saada aimu nende eelistest ja puudustest.

Viimastel kuudel olen juba käsitlenud, kuidas luua ETL-i torujuhtmeid, kasutades mõlemat tööriista, kuid mitte kunagi neid koos, ja see on tühimik, mille ma täna täidan.

Meie eesmärk on õppida Spark+Kafka abil voogedastusrakenduse loomise üldist ideed ja anda reaalsete andmete abil kiire ülevaade selle põhikontseptsioonidest.

Idee on lihtne – Apache Kafka on sõnumite voogesituse tööriist, kus tootjad kirjutavad sõnumeid järjekorra ühte otsa (nn. teema), mida tarbijad teiselt poolt loevad.

Kuid see on väga keeruline tööriist, mis on loodud olema vastupidav hajutatud sõnumsideteenus, millel on kõikvõimalikud edastamisgarantiid (täpselt üks kord, üks kord, ükskõik milline), sõnumite salvestusruum ja sõnumite replikatsioon, võimaldades samal ajal paindlikkust, skaleeritavust ja suurt läbilaskevõimet. Sellel on laiem kasutusjuhtude kogum, nagu mikroteenuste side, reaalajas sündmuste süsteemid ja voogesituse ETL-i torujuhtmed.

Apache Spark on hajutatud mälupõhine andmete teisendusmootor.

See on ka väga keeruline tööriist, mis võimaldab luua ühenduse kõikvõimalike andmebaaside, failisüsteemide ja pilveinfrastruktuuriga. See on mõeldud töötama hajutatud keskkondades, et paralleelstada masinate vahelist töötlemist, saavutades suure jõudlusega teisendusi, kasutades oma laiska hindamisfilosoofiat ja päringute optimeerimist.

Lahe osa asja juures on see, et päeva lõpuks on kood lihtsalt teie tavaline SQL-päring või (peaaegu) teie Python+pandas skript, kusjuures kogu nõidus on abstraheeritud kena kasutajasõbraliku kõrgetasemelise API all.

Ühendage need kaks tehnoloogiat ja meil on ideaalne valik voogesituse ETL-i torujuhtme ehitamiseks.

Kasutame Minas Gerais' (Brasiilia) pealinna Belo Horizonte (BH) liiklusandurite andmeid. See on tohutu andmekogum, mis sisaldab liiklusvoo mõõtmisi mitmes linna kohas. Iga andur tuvastab perioodiliselt selles kohas sõitva sõiduki tüübi (auto, mootorratas, buss/veoauto), selle kiiruse ja pikkuse (ja muu teabe, mida me ei kavatse kasutada).

See andmestik esindab täpselt üht voogedastussüsteemide klassikalist rakendust – andurite rühma, mis saadavad pidevalt oma näidud põllult.

Selle stsenaariumi korral saab Apache Kafkat kasutada andurite ja nende andmeid tarbivate rakenduste vahelise abstraktsioonikihina.

Kafkat kasutatakse allikate ja teenuste vahelise abstraktsioonikihina. Pilt autorilt.

Sellise infrastruktuuriga on võimalik ehitada igasuguseid (nn. reaalajas sündmustepõhised süsteemid, nagu programm liiklusummikute tuvastamiseks ja nendest hoiatamiseks, kui sõidukite arv keskmise kiiruse langusega järsult suureneb.

Ja siin tuleb mängu Apache Spark.

Sellel on voo töötlemiseks natiivne moodul, mida nimetatakse Spark-struktureeritud voogesitus, mis saab Kafkaga ühenduse luua ja selle sõnumeid töödelda.

Keskkonna seadistamine

Kõik, mida vajate, on dokkija ja dokkimiskomponeerimine.

Kasutame dockeri koostamise failikonfiguratsiooni, mis põhineb järgmistel hoidlatel: lingi säde, link kafka.

. ./src maht on koht, kuhu me oma skriptid paneme.

Keskkonna käivitamiseks lihtsalt jookske

docker - koostama üles

Kogu kood on siin saadaval GitHubi hoidla.

Üks asi, mis mulle Sparki õppima asudes enim meeldis, oli sarnasus selle jaoks kirjutatud koodi ja minu tavaliste python+pandas skriptide vahel. Väga lihtne oli rännata.

Sama loogikat järgides on Sparki voogedastusmoodul väga sarnane tavalisele sädekoodile, muutes partiirakendustelt voogesitusrakendustele migreerumise lihtsaks.

Seda arvestades keskendume järgmistes jaotistes Sparki struktureeritud voogesituse eripärade õppimisele, st sellele, millised uued funktsioonid sellel on.

Meie esimene töökoht

Alustame aeglaselt ja ehitame mänguasja näite

Esimese asjana tuleb luua Kafka teema, kust meie sädetöö sõnumeid tarbib.

Seda teeb pääseb Kafka konteinerterminali ja teostab:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Selle teema kohta sõnumeid kirjutava tootja simuleerimiseks kasutame kafka-konsool-tootja. Samuti konteineri sees:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --atribuut "parse.key=true" --atribuut "key.separator=:"

Nüüdsest saadetakse iga terminali sisestatud rida sõnumina testiteemale. Märgi ":" kasutatakse sõnumi võtme ja väärtuse (võti:väärtus) eraldamiseks.

Loome selle teema tarbimiseks Sparki töö.

Kood tuleb sisestada /src/streaming kaust (ei midagi erilist, lihtsalt kaust, mille valisin).

Oluline on märkida, et me kasutame atribuute loe Voog ja writeStream, tavalise lugemise ja kirjutamise asemel. See on peamine aspekt, mis paneb Sparki käsitlema meie tööd voogesitusrakendusena.

Kafkaga ühenduse loomiseks on vaja määrata server ja teema. Valik startOffsets=“kõige varem” käsib Sparkil teema algusest peale läbi lugeda. Ka sellepärast, et Kafka talletab oma sõnumid binaarne kujul, tuleb need dekodeerida nöör.

Ülejäänud võimalusi uuritakse edasi.

Nüüd pääseme Sparki konteinerile ja käivitame töö.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Mõne sekundi pärast seadistamist hakkab see teemat ajama.

Säde tarbivad sõnumeid Kafkalt. Pilt autorilt.

Spark Streaming töötab mikropartiid režiimis ja seetõttu näeme sõnumeid tarbides "partiiteavet".

Mikropakkimine on mõnevõrra täieliku "tõelise" voogedastuse, kus kõiki sõnumeid töödeldakse saabumisel individuaalselt, ja tavalise partii vahel, kus andmed jäävad staatiliseks ja neid tarbitakse nõudmisel. Spark ootab mõnda aega, üritades sõnumeid koguda, et neid koos töödelda, vähendades üldkulusid ja suurendades latentsust. Seda saab häälestada vastavalt teie vajadustele.

Ma ei ole ülikiire tippija, nii et Spark töötleb sõnumit enne, kui saan praegusesse partiisse uusi lisada.

Ja see oli meie esimene voogedastustöö!

Loodan, et saate tunde: vootöötluse tööd pole raske kodeerida, kuid seal on probleeme.

Andmete kirjutamine Kafka voogu

Nüüd on aeg hakata anduriandmetega mängima.

Võite alla laadida tõmblukk faili AUGUST 2022 ja ekstraktige see kausta / andmed maht. Andmed on algselt JSON-is ja võtavad umbes 23 Gb ruumi. Esimene asi, mida teha, on muuta see parketiks, et optimeerida kettaruumi ja lugemisaega.

Selle tegemiseks vajalikud sädetööd on üksikasjalikult kirjeldatud GitHubi hoidlas, kõik, mida pead tegema, on need käivitada:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Sõltuvalt teie masinast võib täitmine veidi aega võtta. Aga tasub end ära, lõplik parketifaili suurus on ~1Gb (üle 20x väiksem) ja palju kiiremini loetav.

Samuti peame oma sõnumite saamiseks looma Kafka teema:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic liiklussensor

Valikuliselt, kui soovite saabuvaid sõnumeid kuvada, on võimalik seadistada konsoolitarbija.

kafka-console-consumer.sh --topic liiklussensor --bootstrap-server localhost:9092

Kafka teema kohta andmete kirjutamine on lihtne, kuid sisaldab mõningaid üksikasju.

Struktureeritud voogesituse puhul ei üritata andmeskeemi (veerud ja nende tüübid) tuletada, seega peame selle edastama.

Kafka sõnumid on lihtsalt võtme-väärtuse binaarstringi paarid, seega peame oma andmeid selles vormingus esitama. Seda saab hõlpsasti saavutada, teisendades kõik read JSON-stringideks, kodeerides need binaarselt ja salvestades tulemuse veergu "väärtus".

Veergude teisendamine JSON-stringideks. Pilt autorilt.

Sõnumiklahvid on Kafkas väga olulised, kuid need ei ole meie testides kasulikud, nii et kõigil sõnumitel on sama.

Nagu varem mainitud, on see andmestik SUUR, seega piirasin sisestatud sõnumite arvu 500,000 XNUMX-ni.

Lõpuks läbime Kafka serveri ja teema ning „kontrollpunkti asukoht”, kuhu säde salvestab täitmise edenemise, mis on kasulik vigadest taastumiseks.

Töö teostamine:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Andmete sisestamine Kafkasse. Pilt autorilt.

Vasakul Sparki töö loeb faili, paremal a kafka-konsool-tarbija kuvab saabuvad teated.

Meie liiklusteema on asustatud ja peaaegu töötlemiseks valmis.

Oluline on meeles pidada, et kasutasime oma teema sisustamiseks sädetööd ainult õppimise eesmärgil. Reaalse stsenaariumi korral saadavad andurid ise näidud otse Kafkale.

Selle dünaamilise käitumise simuleerimiseks kirjutab allolev skript teemasse 1 rea iga 2.5 sekundi järel.

Väljundrežiimid — sõidukite arvu loendamine tüübi järgi

Edasi liikudes loome töökoha sõidukite arvu loendamiseks tüübi järgi.

Veerg “Classificação” (klassifikatsioon) sisaldab tuvastatud sõidukitüüpi.

Teemast lugedes peame JSON-i binaarstringid tagasi veeruvormingusse teisendama.

Kui see on tehtud, saab päringu tavapäraselt üles ehitada. Huvitav on märkida, et päringu süda on lihtsalt valima,groupBy,loe() jada, kõik ülejäänud on seotud voogesituse loogikaga.

Seega on aeg käsitleda väljundrežiim() valik.

Voorakenduse väljundrežiim määrab, kuidas me tahame tulemusi (ümber) arvutada ja kirjutada uute andmete saabumisel.

See võib eeldada kolme erinevat väärtust:

  • Lisa: lisage väljundisse ainult uusi kirjeid.
  • täielik: arvutage iga uue rekordi täistulemus uuesti.
  • Värskendused: värskendage muudetud kirjeid.

Need režiimid võivad olenevalt kirjutatud rakendusest olla mõttekad või mitte. Näiteks ei pruugi režiimil "täielik" olla mõtet, kui rühmitatakse või sorteeritakse.

Teostame töö režiimis "täielik" ja vaatame tulemusi.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — veoauto, mootorsõiduk, määramata-määratlemata, bussi-buss, mootor-mootorratas. Pilt autorilt.

Kui voogu lisatakse uusi kirjeid (vt terminali paremal), arvutab töö täistulemuse ümber. See võib olla kasulik olukordades, kus ridade järjestamine on oluline (nt paremusjärjestus või konkurents).

See lähenemine ei pruugi aga olla optimaalne, kui rühmade arv on liiga suur või üksikud muudatused ei mõjuta üldist tulemust.

Seega on veel üks võimalus kasutada "värskendamise" väljundrežiimi, mis genereerib uue sõnumi ainult muutunud rühmade jaoks. Vaata allpool:

Päring väljundrežiimiga "värskenda". Pilt autorilt.

Lisamisrežiim pole rühmitamisega päringute jaoks saadaval, seega ei saa ma kuvada sama tööd kasutades. Kuid ma arvan, et see on kõige lihtsam režiim alati lisab väljundisse uue kirje.

Neid väljundrežiime on lihtsam mõista, kui mõtlete tulemuste tabelisse salvestamisele. Täielikus väljundrežiimis kirjutatakse tabel ümber iga uue töödeldud sõnumi kohta, värskendusrežiimis ainult need read, kus mõni värskendus toimus, ja lisa lisab alati uue rea lõppu.

Tumbling time window – koondamine ajavahemike abil

Voogedastussüsteemides on sõnumitel kaks erinevat ajatemplit: Sündmuse aeg — sõnumi loomise aeg, meie puhul anduri lugemisaeg ja Töötlemisaeg — kui töötlusagent loeb sõnumit, meie puhul see jõuab Sparki.

Vootöötlustööriistade oluline omadus on võimalus käsitleda sündmuste ajatöötlust. Pööravad aknad on mittekattuvad fikseeritud ajavahemikud, mida kasutatakse sündmuse-aja veergude abil liitmiste tegemiseks. Lihtsamalt öeldes jagavad nad ajaskaala võrdse suurusega viiludeks, nii et iga sündmus kuuluks ühte intervalli.

Näiteks loendage iga 5 minuti järel, mitu sõidukit tuvastati viimase 5 minuti jooksul.

5-minutiline vajuv aken. Pilt autorilt.

Allolev kood illustreerib seda:

Selline töötlemine võib olla paljudes olukordades äärmiselt kasulik. Tulles tagasi varem pakutud ummikuanduri juurde, siis üks võimalik lähenemine on mõõta sõidukite keskmist kiirust 10 min aknas ja vaadata, kas see jääb alla teatud läve.

Sündmuste aegne töötlemine on keeruline teema. Sellega tegelemisel võib juhtuda kõike, näiteks sõnumite kadumist, liiga hilja saabumist või rivist väljalangemist. Sparkil on mitu mehhanismi probleemide leevendamiseks, näiteks vesimärgid, millele me ei keskendu.

Ajaaknaid saab kasutada ka koos teiste veergudega groupBy(). Allolev näide loendab sõidukite arvu tüübi järgi 5-minutilises aknas.

Libisev ajaaken — ajavahemike paindlikumaks muutmine

Libistavad ajaaknad on akende paindlikkus. Mittekattuvate intervallide loomise asemel võimaldavad need määrata, kui sageli iga intervall luuakse.

Näiteks loendage iga 5 minuti järel, mitu sõidukit viimase 30 minuti jooksul tuvastati.

Seetõttu võivad sündmused kuuluda mitmesse intervalli ja neid loendada nii mitu korda kui vaja.

Liugakna määratlemiseks edastage lihtsalt värskendusintervall aken()funktsioon.

Vaatame väljundit.

Nagu näeme, luuakse iga 30 minuti järel 5-minutilised aknad.

See paindlikkus võib olla üsna kasulik spetsiifilisemate ärireeglite ja keerukamate käivitajate määratlemiseks. Näiteks võib meie liiklusummikuandur saata vastuseid iga 5 sekundi järel viimase 10 minuti kohta ja luua hoiatuse, kui auto keskmine kiirus langeb alla 20 km/h.

See oli kiire ülevaade Spark Structured Streamingi põhikontseptsioonidest ja sellest, kuidas neid Kafkaga rakendada.

Apache Kafka ja Apache Spark on mõlemad usaldusväärsed ja tugevad tööriistad, mida paljud ettevõtted kasutavad uskumatute andmemahtude igapäevaseks töötlemiseks, muutes need vootöötluse ülesandes üheks tugevaimaks paariks.

Oleme õppinud, kuidas Sparki tööde abil Kafka teemasid täita, tarbida ja töödelda. See ei olnud raske ülesanne, nagu postituses mainitud, on vootöötluse API peaaegu võrdne tavalise partii-API-ga, vaid mõne väiksema muudatusega.

Oleme arutanud ka erinevaid väljundrežiime, voogesitusrakendustele spetsiifilist asja ja seda, kuidas neid saab kasutada. Viimaseks, kuid mitte vähem tähtsaks, uurisime ajaakendega liite, mis on üks vootöötluse peamisi võimalusi.

Jällegi, see oli lihtsalt kiire pilk ja kui soovite põhjalikumalt uurida, jätan allpool mõned viited.

Loodetavasti aitasin kuidagi, tänan, et lugesite! 🙂

Kogu kood on siin saadaval GitHubi hoidla.
Kasutatud andmed —
Contagens Volumétricas de Radares, Avatud andmed, Brasiilia kuberner.

[1] Funktsioon Deep Dive: vesimärgid Apache Spark Structured Streamingis - Max Fisher Databricksi ajaveebis
[2] Chambers, B. ja Zaharia, M. (2018). Spark: lõplik juhend: suur andmetöötlus on tehtud lihtsaks. "O'Reilly Media, Inc."
[3] Reaalajas logistika, kohaletoimetamine ja transport Apache Kafkaga— Kai Waehner
[4] Esinevad Apache Kafka Netflixi stuudios ja finantsmaailmas — Ladustav ajaveeb
[5] Spark Streaming ja Kafka — https://sparkbyexamples.com/

Kiire ülevaade Spark Structured Streaming + Kafka uuesti avaldatud allikast https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 kaudu https:/ /towardsdatascience.com/feed

<!–

->

Ajatempel:

Veel alates Blockchaini konsultandid