Hiter pogled na Spark Structured Streaming + Kafka

Učenje osnov uporabe tega zmogljivega dua za opravila obdelave toka

Foto: Nikhita Singhal on Unsplash

Pred kratkim sem začel veliko študirati o Apache Kafka in Apache Spark, dveh vodilnih tehnologijah v svetu podatkovnega inženiringa.

V zadnjih nekaj mesecih sem naredil več projektov z njihovo uporabo; “Pretakanje strojnega učenja s Kafko, Debeziumom in BentoML” je primer. Osredotočam se na to, da se naučim ustvariti zmogljive podatkovne cevovode s temi sodobnimi znanimi orodji in pridobim občutek o njihovih prednostih in slabostih.

V zadnjih mesecih sem že obravnaval, kako ustvariti cevovode ETL z uporabo obeh orodij, vendar nikoli skupaj, in to je vrzel, ki jo bom zapolnil danes.

Naš cilj je spoznati splošno idejo za izdelavo pretočne aplikacije s Spark+Kafka in si na hitro ogledati njene glavne koncepte z uporabo resničnih podatkov.

Ideja je preprosta – Apache Kafka je orodje za pretakanje sporočil, kjer proizvajalci pišejo sporočila na enem koncu čakalne vrste (imenovane temo), da jih potrošniki berejo na drugi strani.

Toda to je zelo zapleteno orodje, zgrajeno kot prožna storitev porazdeljenega sporočanja z vsemi vrstami jamstev za dostavo (natančno enkrat, enkrat, poljubno), shranjevanjem sporočil in podvajanjem sporočil, hkrati pa omogoča prilagodljivost, razširljivost in visoko prepustnost. Ima širši nabor primerov uporabe, kot so komunikacija mikrostoritev, sistemi dogodkov v realnem času in pretočni cevovodi ETL.

Apache Spark je motor za pretvorbo podatkov, ki temelji na porazdeljenem pomnilniku.

Je tudi zelo zapleteno orodje, ki se lahko poveže z vsemi vrstami podatkovnih baz, datotečnih sistemov in infrastrukture v oblaku. Prirejen je za delovanje v porazdeljenih okoljih za vzporedno obdelavo med stroji, doseganje visoko zmogljivih transformacij z uporabo svoje filozofije lenega ocenjevanja in optimizacij poizvedb.

Kul del pri tem je, da je do konca dneva koda le vaša običajna poizvedba SQL ali (skoraj) vaš skript Python+pandas, z vsem čarovništvom, povzetim pod prijetnim uporabniku prijaznim API-jem na visoki ravni.

Pridružite se tema dvema tehnologijama in dobili bomo popolno ujemanje za izgradnjo pretočnega cevovoda ETL.

Uporabili bomo podatke prometnih senzorjev v mestu Belo Horizonte (BH), glavnem mestu Minas Gerais (Brazilija). To je ogromen nabor podatkov, ki vsebuje meritve pretoka prometa na več mestih v mestu. Vsak senzor občasno zazna vrsto vozila, ki vozi na tej lokaciji (avtomobil, motorno kolo, avtobus/tovornjak), njegovo hitrost in dolžino (in druge podatke, ki jih ne bomo uporabili).

Ta nabor podatkov predstavlja natanko eno od klasičnih aplikacij za pretočne sisteme – skupino senzorjev, ki nenehno pošiljajo svoje odčitke s terena.

V tem scenariju se lahko Apache Kafka uporablja kot abstraktna plast med senzorji in aplikacijami, ki uporabljajo njihove podatke.

Kafka uporabljen kot abstraktna plast med viri in storitvami. Slika avtorja.

S tovrstno infrastrukturo je možno graditi vse vrste (t.i.) sistemi, ki temeljijo na dogodkih v realnem času, kot program za zaznavanje in opozarjanje na prometne zastoje, ko se število vozil nenadoma poveča s padcem povprečne hitrosti.

In tu nastopi Apache Spark.

Ima izvorni modul za obdelavo toka, imenovan Spark Structured Streaming, ki se lahko poveže s Kafko in obdeluje njena sporočila.

Nastavitev okolja

Vse, kar potrebujete, sta docker in docker-compose.

Uporabili bomo konfiguracijo datoteke docker-compose, ki temelji na naslednjih repozitorijih: povezava iskra, povezava kafka.

O ./src volumen je mesto, kamor bomo postavili naše skripte.

Če želite zagnati okolje, samo zaženite

docker-compose up

V tem je na voljo vsa koda GitHub repozitorij.

Ena od stvari, ki mi je bila najbolj všeč, ko sem začel študirati Spark, je bila podobnost med napisano kodo zanj in mojimi običajnimi skripti python+pandas. Preseliti se je bilo zelo enostavno.

Po isti logiki je pretočni modul Spark zelo podoben običajni kodi spark, kar olajša selitev s paketnih aplikacij na pretočne.

Glede na to se bomo v naslednjih razdelkih osredotočili na učenje posebnosti strukturiranega pretakanja Spark, tj., katere nove funkcije ima.

Naša prva služba

Začnimo počasi in zgradimo primer igrače

Prva stvar, ki jo morate storiti, je ustvariti Kafkino temo, od koder bo naša iskrica zaužila sporočila.

To naredi z dostop do kontejnerskega terminala Kafka in izvajanje:

kafka-topics.sh --create --bootstrap-server localhost:9092 --tema testna_tema

Za simulacijo producenta, ki piše sporočila o tej temi, uporabimo kafka-proizvajalec konzole. Tudi v posodi:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Odslej bo vsaka vrstica, vnesena v terminal, poslana kot sporočilo v testno temo. Znak “:” se uporablja za ločevanje ključa in vrednosti sporočila (ključ:vrednost).

Ustvarimo službo Spark za uporabo te teme.

Kodo je treba vstaviti znotraj /src/pretakanje mapa (nič posebnega, samo mapa, ki sem jo izbral).

Ključna stvar, ki jo je treba upoštevati, je, da uporabljamo atribute readStream in writeStream, namesto običajnega branja in pisanja. To je glavni vidik, zaradi katerega Spark naše delo obravnava kot aplikacijo za pretakanje.

Za povezavo s Kafko je potrebno določiti strežnik in temo. Možnost začetni odmiki=“earliest« sporoči Sparku, naj prebere temo od začetka. Tudi zato, ker Kafka svoja sporočila hrani v binarni obliki, jih je treba dekodirati niz.

Druge možnosti bodo dodatno raziskane.

Zdaj pa dostopimo do vsebnika Spark in zaženimo opravilo.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Po nekaj sekundah konfiguracije bo začel uporabljati temo.

Kafkova sporočila, ki jemajo iskre. Slika avtorja.

Spark Streaming deluje v mikro šarža in zato vidimo informacije o "paketu", ko ta porabi sporočila.

Mikropaket je nekaj med popolnim »pravim« pretakanjem, kjer se vsa sporočila obdelajo posamično, ko prispejo, in običajnim paketom, kjer podatki ostanejo statični in se porabijo na zahtevo. Spark bo počakal nekaj časa in poskušal zbrati sporočila, da bi jih skupaj obdelal, kar bo zmanjšalo stroške in povečalo zakasnitev. To je mogoče prilagoditi vašim potrebam.

Nisem zelo hiter tipkalec, zato Spark obdela sporočilo, preden lahko v trenutni paket vključim nova.

In to je bilo naše prvo pretakanje!

Upam, da imate občutek: ni težko kodirati opravila za obdelavo toka, vendar obstajajo nekatere težave.

Zapisovanje podatkov v Kafkov tok

Zdaj je čas, da se začnete igrati s podatki senzorja.

Lahko prenesete Zadrga datoteko iz AVGUSTA 2022 in jo ekstrahirajte v / podatki glasnost. Podatki so izvirno v JSON in zavzamejo približno 23 Gb prostora. Najprej ga morate pretvoriti v parket, da optimizirate prostor na disku in čas branja.

Opravila spark za to so podrobno opisana v repozitoriju GitHub, vse kar morate storiti je, da jih izvedete:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Odvisno od vaše naprave lahko izvedba traja nekaj časa. Ampak se splača, končna velikost datoteke parketa je ~1Gb (več kot 20x manjša) in veliko hitrejša za branje.

Za prejemanje naših sporočil moramo ustvariti tudi temo Kafka:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Če želite prikazati prispela sporočila, lahko po želji nastavite porabnika konzole.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Pisanje podatkov o Kafkini temi je enostavno, vendar vsebuje nekaj podrobnosti.

Pri strukturiranem pretakanju je privzeto vedenje, da se podatkovna shema (stolpci in njihovi tipi) ne poskuša sklepati, zato jo moramo posredovati.

Sporočila Kafka so samo pari binarnih nizov ključ-vrednost, zato moramo svoje podatke predstaviti v tej obliki. To je mogoče enostavno doseči s pretvorbo vseh vrstic v nize JSON, kodiranjem v dvojiški obliki in shranjevanjem rezultata v stolpec »vrednost«.

Pretvorba stolpcev v nize JSON. Slika avtorja.

Ključi sporočil so zelo pomembni v Kafki, vendar ne bodo uporabni v naših testih, zato bodo vsa sporočila enaka.

Kot že omenjeno, je ta nabor podatkov Ogromen, zato sem omejil število vstavljenih sporočil na 500,000.

Nazadnje posredujemo strežnik Kafka in temo ter »checkpointLocation«, kjer bo iskra shranila napredek izvajanja, kar je uporabno za okrevanje po napakah.

Izvajanje dela:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Vstavljanje podatkov v Kafko. Slika avtorja.

Na levi strani opravilo Spark bere datoteko, na desni pa a kafka-konzola-potrošnik prikaže prispela sporočila.

Naša prometna tema je izpolnjena in skoraj pripravljena za obdelavo.

Pomembno si je zapomniti, da smo za zapolnitev naše teme uporabili nalogo spark zgolj za namene učenja. V resničnem scenariju bodo senzorji sami pošiljali odčitke neposredno Kafki.

Za simulacijo tega dinamičnega obnašanja spodnji skript zapiše 1 vrstico v temo vsaki 2.5 sekunde.

Izhodni načini — štetje števila vozil glede na vrsto

Če nadaljujemo, ustvarimo nalogo za štetje števila vozil po vrsti.

Stolpec „Classificação“ (Razvrstitev) vsebuje zaznan tip vozila.

Kot beremo iz teme, moramo binarne nize JSON pretvoriti nazaj v stolpčno obliko.

Ko je to storjeno, lahko poizvedbo sestavite kot običajno. Zanimivo je, da je srce poizvedbe samo izberite().groupby().štetje(), vse ostalo je relativno glede na logiko pretakanja.

Torej je čas, da se posvetimo outputMode() možnost.

Izhodni način pretočne aplikacije določa, kako želimo (ponovno) izračunati in zapisati rezultate, ko prispejo novi podatki.

Privzame lahko tri različne vrednosti:

  • Doda: Izpisu dodajte samo nove zapise.
  • Dokončati: Ponovno izračunajte celoten rezultat za vsak nov zapis.
  • Nadgradnja: Posodobi spremenjene zapise.

Ti načini so lahko smiselni ali ne, odvisno od napisane aplikacije. Na primer, »popoln« način morda ni smiseln, če se izvede kakršno koli združevanje ali razvrščanje.

Izvedimo nalogo v načinu »dokončano« in poglejmo rezultate.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Tovornjak, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Slika avtorja.

Ko so novi zapisi vstavljeni v tok (glejte terminal na desni), opravilo znova izračuna celoten rezultat. To je lahko uporabno v situacijah, ko je vrstni red pomemben, kot je razvrstitev ali tekmovanje.

Vendar ta pristop morda ne bo optimalen, če je število skupin preveliko ali posamezne spremembe ne vplivajo na skupni rezultat.

Druga možnost je torej uporaba izhodnega načina »posodobi«, ki ustvari novo sporočilo samo za spremenjene skupine. Glej spodaj:

Poizvedba z izhodnim načinom »posodobitev«. Slika avtorja.

Način »dodaj« ni na voljo za poizvedbe z združevanjem, zato ne bom mogel prikazati z uporabo istega opravila. Mislim pa, da je to najpreprostejši način vedno izhodu doda nov zapis.

Te izhodne načine je lažje razumeti, če razmišljate o shranjevanju rezultatov v tabelo. V načinu celotnega izpisa bo tabela prepisana za vsako novo obdelano sporočilo, v načinu posodobitve samo vrstice, kjer je prišlo do posodobitve, dodajanje pa bo na konec vedno dodalo novo vrstico.

Časovno okno vrtenja — Seštevanje z uporabo časovnih intervalov

V pretočnih sistemih imajo sporočila dva različna časovna žiga, povezana z njimi: čas dogodka — čas, ko je bilo sporočilo ustvarjeno, v našem primeru čas branja senzorja, in čas obdelave — ko agent za obdelavo prebere sporočilo, v našem primeru ko doseže Spark.

Pomembna značilnost orodij za obdelavo toka je zmožnost obdelave časa dogodka. Prevračajoča se okna so neprekrivajoči se fiksni časovni intervali, ki se uporabljajo za združevanje z uporabo stolpcev s časom dogodkov. Preprosteje rečeno, časovnico razrežejo na enako velike rezine, tako da vsak dogodek pripada enemu intervalu.

Na primer, vsakih 5 minut preštejte, koliko vozil je bilo zaznanih v zadnjih 5 minutah.

5-minutno vrtljivo okno. Slika avtorja.

Spodnja koda to ponazarja:

Tovrstna obdelava je lahko zelo uporabna v številnih situacijah. Če se vrnemo k prej predlaganemu detektorju prometnih zastojev, je eden od možnih pristopov merjenje povprečne hitrosti vozil v 10-minutnem oknu in ugotavljanje, ali je pod določenim pragom.

Dogodkovno-časovna obdelava je kompleksna tema. Pri obravnavi se lahko zgodi vse, na primer izgubljena sporočila, prepozen prihod ali okvara. Spark ima več mehanizmov, s katerimi poskuša ublažiti težave, npr vodnih žigov, na katerega se ne bomo osredotočali.

Časovna okna lahko uporabite tudi v povezavi z drugimi stolpci v groupBy(). Spodnji primer šteje število vozil glede na vrsto v 5-minutnem oknu.

Drseče časovno okno — Fleksibilizacija časovnih intervalov

Drsna časovna okna so fleksibilizacija padajočih oken. Namesto ustvarjanja neprekrivajočih se intervalov omogočajo določanje, kako pogosto bo vsak interval ustvarjen.

Na primer, vsakih 5 minut preštejte, koliko vozil je bilo zaznanih v zadnjih 30 minutah.

Zaradi tega lahko dogodki pripadajo številnim intervalom in se štejejo tolikokrat, kot je potrebno.

Če želite določiti drsno okno, samo posredujte interval posodabljanja okno()funkcija.

Poglejmo rezultat.

Kot lahko vidimo, se vsakih 30 minut ustvarijo 5-minutna okna.

Ta prilagodljivost je lahko zelo uporabna za definiranje bolj specifičnih poslovnih pravil in bolj zapletenih sprožilcev. Na primer, naš detektor prometnih zastojev lahko pošlje odgovore vsakih 5 sekund približno zadnjih 10 minut in ustvari opozorilo, ko povprečna hitrost avtomobila pade pod 20 km/h.

To je bil hiter pogled na glavne koncepte Spark Structured Streaming in kako jih je mogoče uporabiti s Kafko.

Apache Kafka in Apache Spark sta zanesljivi in ​​robustni orodji, ki ju mnoga podjetja uporabljajo za vsakodnevno obdelavo neverjetnih količin podatkov, zaradi česar sta eden najmočnejših parov v nalogi obdelave toka.

Naučili smo se, kako zapolniti, porabiti in obdelati Kafkine teme z uporabo delovnih mest Spark. To ni bila težka naloga, kot je omenjeno v objavi, je API za obdelavo toka skoraj enak običajnemu paketnemu API-ju, le z nekaj manjšimi prilagoditvami.

Razpravljali smo tudi o različnih izhodnih načinih, o nečem specifičnem za pretočne aplikacije, in o tem, kako je mogoče vsakega uporabiti. Nenazadnje smo raziskali združevanje s časovnimi okni, eno glavnih zmožnosti obdelave toka.

Še enkrat, to je bil samo hiter pogled in spodaj bom pustil nekaj referenc, če želite raziskati globlje.

Upam, da sem kakorkoli pomagal, hvala za branje! 🙂

V tem je na voljo vsa koda GitHub repozitorij.
Uporabljeni podatki —
Contagens Volumétricas de Radares, Odprti podatki, brazilski guverner

[1] Funkcija Deep Dive: vodni žig v strukturiranem pretakanju Apache Spark — Max Fisher na blogu Databricks
[2] Chambers, B., & Zaharia, M. (2018). Spark: dokončni vodnik: preprosta obdelava velikih podatkov. “O'Reilly Media, Inc.”.
[3] Logistika, pošiljanje in transport v realnem času z Apache Kafka— Kai Waehner
[4] Z Apache Kafko v studiu Netflix in svetu financ — Confluent blog
[5] Spark Streaming & Kafka — https://sparkbyexamples.com/

Hiter pogled na Spark Structured Streaming + Kafka Ponovno objavljeno iz vira https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 prek https:/ /towardsdatascience.com/feed

<!–

->

Časovni žig:

Več od Svetovalci v verigi blokov